Skip to content

Commit

Permalink
satellite: switch to use nodefilters instead of old placement.Allowed…
Browse files Browse the repository at this point in the history
…Country

placement.AllowedCountry is the old way to specify placement, with the new approach we can use a more generic (dynamic method), which can check full node information instead of just the country code.

The 90% of this patch is just search and replace:

 * we need to use NodeFilters instead of placement.AllowedCountry
 * which means, we need an initialized PlacementRules available everywhere
 * which means we need to configure the placement rules

The remaining 10% is the placement.go, where we introduced a new type of configuration (lightweight expression language) to define any kind of placement without code change.

Change-Id: Ie644b0b1840871b0e6bbcf80c6b50a947503d7df
  • Loading branch information
elek authored and Storj Robot committed Jul 7, 2023
1 parent e0b5476 commit 97a89c3
Show file tree
Hide file tree
Showing 32 changed files with 427 additions and 94 deletions.
10 changes: 6 additions & 4 deletions cmd/satellite/repair_segment.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,16 +94,17 @@ func cmdRepairSegment(cmd *cobra.Command, args []string) (err error) {

dialer := rpc.NewDefaultDialer(tlsOptions)

overlay, err := overlay.NewService(log.Named("overlay"), db.OverlayCache(), db.NodeEvents(), config.Console.ExternalAddress, config.Console.SatelliteName, config.Overlay)
overlayService, err := overlay.NewService(log.Named("overlay"), db.OverlayCache(), db.NodeEvents(), config.Placement.CreateFilters, config.Console.ExternalAddress, config.Console.SatelliteName, config.Overlay)
if err != nil {
return err
}

orders, err := orders.NewService(
log.Named("orders"),
signing.SignerFromFullIdentity(identity),
overlay,
overlayService,
orders.NewNoopDB(),
config.Placement.CreateFilters,
config.Orders,
)
if err != nil {
Expand All @@ -122,17 +123,18 @@ func cmdRepairSegment(cmd *cobra.Command, args []string) (err error) {
log.Named("segment-repair"),
metabaseDB,
orders,
overlay,
overlayService,
nil, // TODO add noop version
ecRepairer,
config.Placement.CreateFilters,
config.Checker.RepairOverrides,
config.Repairer,
)

// TODO reorganize to avoid using peer.

peer := &satellite.Repairer{}
peer.Overlay = overlay
peer.Overlay = overlayService
peer.Orders.Service = orders
peer.EcRepairer = ecRepairer
peer.SegmentRepairer = segmentRepairer
Expand Down
6 changes: 3 additions & 3 deletions cmd/tools/segment-verify/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,12 +203,12 @@ func verifySegments(cmd *cobra.Command, args []string) error {
dialer := rpc.NewDefaultDialer(tlsOptions)

// setup dependencies for verification
overlay, err := overlay.NewService(log.Named("overlay"), db.OverlayCache(), db.NodeEvents(), "", "", satelliteCfg.Overlay)
overlayService, err := overlay.NewService(log.Named("overlay"), db.OverlayCache(), db.NodeEvents(), overlay.NewPlacementRules().CreateFilters, "", "", satelliteCfg.Overlay)
if err != nil {
return Error.Wrap(err)
}

ordersService, err := orders.NewService(log.Named("orders"), signing.SignerFromFullIdentity(identity), overlay, orders.NewNoopDB(), satelliteCfg.Orders)
ordersService, err := orders.NewService(log.Named("orders"), signing.SignerFromFullIdentity(identity), overlayService, orders.NewNoopDB(), overlay.NewPlacementRules().CreateFilters, satelliteCfg.Orders)
if err != nil {
return Error.Wrap(err)
}
Expand Down Expand Up @@ -243,7 +243,7 @@ func verifySegments(cmd *cobra.Command, args []string) error {

// setup verifier
verifier := NewVerifier(log.Named("verifier"), dialer, ordersService, verifyConfig)
service, err := NewService(log.Named("service"), metabaseDB, verifier, overlay, serviceConfig)
service, err := NewService(log.Named("service"), metabaseDB, verifier, overlayService, serviceConfig)
if err != nil {
return Error.Wrap(err)
}
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ require (
github.com/jackc/pgx/v5 v5.3.1
github.com/jtolds/monkit-hw/v2 v2.0.0-20191108235325-141a0da276b3
github.com/jtolio/eventkit v0.0.0-20230607152326-4668f79ff72d
github.com/jtolio/mito v0.0.0-20230523171229-d78ef06bb77b
github.com/jtolio/noiseconn v0.0.0-20230301220541-88105e6c8ac6
github.com/loov/hrtime v1.0.3
github.com/mattn/go-sqlite3 v1.14.12
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -324,6 +324,8 @@ github.com/jtolds/tracetagger/v2 v2.0.0-rc5 h1:SriMFVtftPsQmG+0xaABotz9HnoKoo1QM
github.com/jtolds/tracetagger/v2 v2.0.0-rc5/go.mod h1:61Fh+XhbBONy+RsqkA+xTtmaFbEVL040m9FAF/hTrjQ=
github.com/jtolio/eventkit v0.0.0-20230607152326-4668f79ff72d h1:MAGZUXA8MLSA5oJT1Gua3nLSyTYF2uvBgM4Sfs5+jts=
github.com/jtolio/eventkit v0.0.0-20230607152326-4668f79ff72d/go.mod h1:PXFUrknJu7TkBNyL8t7XWDPtDFFLFrNQQAdsXv9YfJE=
github.com/jtolio/mito v0.0.0-20230523171229-d78ef06bb77b h1:HKvXTXZTeUHXRibg2ilZlkGSQP6A3cs0zXrBd4xMi6M=
github.com/jtolio/mito v0.0.0-20230523171229-d78ef06bb77b/go.mod h1:Mrym6OnPMkBKvN8/uXSkyhFSh6ndKKYE+Q4kxCfQ4V0=
github.com/jtolio/noiseconn v0.0.0-20230301220541-88105e6c8ac6 h1:iVMQyk78uOpX/UKjEbzyBdptXgEz6jwGwo7kM9IQ+3U=
github.com/jtolio/noiseconn v0.0.0-20230301220541-88105e6c8ac6/go.mod h1:MEkhEPFwP3yudWO0lj6vfYpLIB+3eIcuIW+e0AZzUQk=
github.com/julienschmidt/httprouter v1.2.0/go.mod h1:SYymIcj16QtmaHHD7aYtjjsJG7VTCxuUUipMqKk8s4w=
Expand Down
1 change: 0 additions & 1 deletion private/testplanet/satellite.go
Original file line number Diff line number Diff line change
Expand Up @@ -746,7 +746,6 @@ func (planet *Planet) newRangedLoop(ctx context.Context, index int, db satellite

prefix := "satellite-ranged-loop" + strconv.Itoa(index)
log := planet.log.Named(prefix)

return satellite.NewRangedLoop(log, db, metabaseDB, &config, nil)
}

Expand Down
3 changes: 2 additions & 1 deletion satellite/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -282,7 +282,7 @@ func NewAPI(log *zap.Logger, full *identity.FullIdentity, db DB,
{ // setup overlay
peer.Overlay.DB = peer.DB.OverlayCache()

peer.Overlay.Service, err = overlay.NewService(peer.Log.Named("overlay"), peer.Overlay.DB, peer.DB.NodeEvents(), config.Console.ExternalAddress, config.Console.SatelliteName, config.Overlay)
peer.Overlay.Service, err = overlay.NewService(peer.Log.Named("overlay"), peer.Overlay.DB, peer.DB.NodeEvents(), config.Placement.CreateFilters, config.Console.ExternalAddress, config.Console.SatelliteName, config.Overlay)
if err != nil {
return nil, errs.Combine(err, peer.Close())
}
Expand Down Expand Up @@ -387,6 +387,7 @@ func NewAPI(log *zap.Logger, full *identity.FullIdentity, db DB,
signing.SignerFromFullIdentity(peer.Identity),
peer.Overlay.Service,
peer.Orders.DB,
config.Placement.CreateFilters,
config.Orders,
)
if err != nil {
Expand Down
3 changes: 2 additions & 1 deletion satellite/auditor.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ func NewAuditor(log *zap.Logger, full *identity.FullIdentity,

{ // setup overlay
var err error
peer.Overlay, err = overlay.NewService(log.Named("overlay"), overlayCache, nodeEvents, config.Console.ExternalAddress, config.Console.SatelliteName, config.Overlay)
peer.Overlay, err = overlay.NewService(log.Named("overlay"), overlayCache, nodeEvents, config.Placement.CreateFilters, config.Console.ExternalAddress, config.Console.SatelliteName, config.Overlay)
if err != nil {
return nil, errs.Combine(err, peer.Close())
}
Expand Down Expand Up @@ -183,6 +183,7 @@ func NewAuditor(log *zap.Logger, full *identity.FullIdentity,
// PUT and GET actions which are not used by
// auditor so we can set noop implementation.
orders.NewNoopDB(),
config.Placement.CreateFilters,
config.Orders,
)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion satellite/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,7 @@ func New(log *zap.Logger, full *identity.FullIdentity, db DB,

{ // setup overlay
peer.Overlay.DB = peer.DB.OverlayCache()
peer.Overlay.Service, err = overlay.NewService(peer.Log.Named("overlay"), peer.Overlay.DB, peer.DB.NodeEvents(), config.Console.ExternalAddress, config.Console.SatelliteName, config.Overlay)
peer.Overlay.Service, err = overlay.NewService(peer.Log.Named("overlay"), peer.Overlay.DB, peer.DB.NodeEvents(), config.Placement.CreateFilters, config.Console.ExternalAddress, config.Console.SatelliteName, config.Overlay)
if err != nil {
return nil, errs.Combine(err, peer.Close())
}
Expand Down
2 changes: 2 additions & 0 deletions satellite/metabase/rangedloop/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"storj.io/storj/satellite/metabase/rangedloop"
"storj.io/storj/satellite/metabase/rangedloop/rangedlooptest"
"storj.io/storj/satellite/metrics"
"storj.io/storj/satellite/overlay"
"storj.io/storj/satellite/repair/checker"
)

Expand Down Expand Up @@ -426,6 +427,7 @@ func TestAllInOne(t *testing.T) {
satellite.DB.RepairQueue(),
satellite.Overlay.Service,
satellite.Config.Checker,
overlay.NewPlacementRules().CreateFilters,
[]string{},
),
})
Expand Down
102 changes: 102 additions & 0 deletions satellite/metainfo/endpoint_object_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,9 @@ import (

"storj.io/common/errs2"
"storj.io/common/identity"
"storj.io/common/identity/testidentity"
"storj.io/common/memory"
"storj.io/common/nodetag"
"storj.io/common/pb"
"storj.io/common/rpc/rpcstatus"
"storj.io/common/signing"
Expand All @@ -37,6 +39,9 @@ import (
"storj.io/storj/satellite/internalpb"
"storj.io/storj/satellite/metabase"
"storj.io/storj/satellite/metainfo"
"storj.io/storj/satellite/overlay"
"storj.io/storj/storagenode"
"storj.io/storj/storagenode/contact"
"storj.io/uplink"
"storj.io/uplink/private/metaclient"
"storj.io/uplink/private/object"
Expand Down Expand Up @@ -2450,3 +2455,100 @@ func TestListUploads(t *testing.T) {
require.Equal(t, 1000, items)
})
}

func TestPlacements(t *testing.T) {
ctx := testcontext.New(t)

satelliteIdentity := signing.SignerFromFullIdentity(testidentity.MustPregeneratedSignedIdentity(0, storj.LatestIDVersion()))

placementRules := overlay.ConfigurablePlacementRule{}
err := placementRules.Set(fmt.Sprintf(`16:tag("%s", "certified","true")`, satelliteIdentity.ID()))
require.NoError(t, err)

testplanet.Run(t,
testplanet.Config{
SatelliteCount: 1,
StorageNodeCount: 12,
UplinkCount: 1,
Reconfigure: testplanet.Reconfigure{
Satellite: func(log *zap.Logger, index int, config *satellite.Config) {
config.Metainfo.RS.Min = 3
config.Metainfo.RS.Repair = 4
config.Metainfo.RS.Success = 5
config.Metainfo.RS.Total = 6
config.Metainfo.MaxInlineSegmentSize = 1
config.Placement = placementRules
},
StorageNode: func(index int, config *storagenode.Config) {
if index%2 == 0 {
tags := &pb.NodeTagSet{
NodeId: testidentity.MustPregeneratedSignedIdentity(index+1, storj.LatestIDVersion()).ID.Bytes(),
Timestamp: time.Now().Unix(),
Tags: []*pb.Tag{
{
Name: "certified",
Value: []byte("true"),
},
},
}
signed, err := nodetag.Sign(ctx, tags, satelliteIdentity)
require.NoError(t, err)

config.Contact.Tags = contact.SignedTags(pb.SignedNodeTagSets{
Tags: []*pb.SignedNodeTagSet{
signed,
},
})
}

},
},
},
func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
satellite := planet.Satellites[0]
buckets := satellite.API.Buckets.Service
uplink := planet.Uplinks[0]
projectID := uplink.Projects[0].ID

// create buckets with different placement (placement 16 is configured above)
createGeofencedBucket(t, ctx, buckets, projectID, "constrained", 16)

objectNo := 10
for i := 0; i < objectNo; i++ {
// upload an object to one of the global buckets
err := uplink.Upload(ctx, satellite, "constrained", "testobject"+strconv.Itoa(i), make([]byte, 10240))
require.NoError(t, err)
}

apiKey := planet.Uplinks[0].APIKey[planet.Satellites[0].ID()]
metainfoClient, err := uplink.DialMetainfo(ctx, satellite, apiKey)
require.NoError(t, err)
defer func() {
_ = metainfoClient.Close()
}()

objects, _, err := metainfoClient.ListObjects(ctx, metaclient.ListObjectsParams{
Bucket: []byte("constrained"),
})
require.NoError(t, err)
require.Len(t, objects, objectNo)

for _, listedObject := range objects {
o, err := metainfoClient.DownloadObject(ctx, metaclient.DownloadObjectParams{
Bucket: []byte("constrained"),
EncryptedObjectKey: listedObject.EncryptedObjectKey,
})
require.NoError(t, err)

for _, limit := range o.DownloadedSegments[0].Limits {
if limit != nil {
// starting from 2 (first identity used for satellite, SN with even index are fine)
for i := 2; i < 11; i += 2 {
require.NotEqual(t, testidentity.MustPregeneratedSignedIdentity(i, storj.LatestIDVersion()).ID, limit.Limit.StorageNodeId)
}
}
}
}
},
)
}
23 changes: 13 additions & 10 deletions satellite/orders/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,10 +54,11 @@ type Overlay interface {
//
// architecture: Service
type Service struct {
log *zap.Logger
satellite signing.Signer
overlay Overlay
orders DB
log *zap.Logger
satellite signing.Signer
overlay Overlay
orders DB
placementRules overlay.PlacementRules

encryptionKeys EncryptionKeys

Expand All @@ -70,17 +71,18 @@ type Service struct {
// NewService creates new service for creating order limits.
func NewService(
log *zap.Logger, satellite signing.Signer, overlay Overlay,
orders DB, config Config,
orders DB, placementRules overlay.PlacementRules, config Config,
) (*Service, error) {
if config.EncryptionKeys.Default.IsZero() {
return nil, Error.New("encryption keys must be specified to include encrypted metadata")
}

return &Service{
log: log,
satellite: satellite,
overlay: overlay,
orders: orders,
log: log,
satellite: satellite,
overlay: overlay,
orders: orders,
placementRules: placementRules,

encryptionKeys: config.EncryptionKeys,

Expand Down Expand Up @@ -146,8 +148,9 @@ func (service *Service) CreateGetOrderLimits(ctx context.Context, bucket metabas
}

if segment.Placement != storj.EveryCountry {
filter := service.placementRules(segment.Placement)
for id, node := range nodes {
if !segment.Placement.AllowedCountry(node.CountryCode) {
if !filter.MatchInclude(node) {
delete(nodes, id)
}
}
Expand Down
17 changes: 10 additions & 7 deletions satellite/orders/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"storj.io/storj/satellite/metabase"
"storj.io/storj/satellite/nodeselection"
"storj.io/storj/satellite/orders"
"storj.io/storj/satellite/overlay"
)

func TestGetOrderLimits(t *testing.T) {
Expand Down Expand Up @@ -55,14 +56,16 @@ func TestGetOrderLimits(t *testing.T) {
CachedGetOnlineNodesForGet(gomock.Any(), gomock.Any()).
Return(nodes, nil).AnyTimes()

service, err := orders.NewService(zaptest.NewLogger(t), k, overlayService, orders.NewNoopDB(), orders.Config{
EncryptionKeys: orders.EncryptionKeys{
Default: orders.EncryptionKey{
ID: orders.EncryptionKeyID{1, 2, 3, 4, 5, 6, 7, 8},
Key: testrand.Key(),
service, err := orders.NewService(zaptest.NewLogger(t), k, overlayService, orders.NewNoopDB(),
overlay.NewPlacementRules().CreateFilters,
orders.Config{
EncryptionKeys: orders.EncryptionKeys{
Default: orders.EncryptionKey{
ID: orders.EncryptionKeyID{1, 2, 3, 4, 5, 6, 7, 8},
Key: testrand.Key(),
},
},
},
})
})
require.NoError(t, err)

segment := metabase.Segment{
Expand Down
2 changes: 1 addition & 1 deletion satellite/overlay/benchmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -355,7 +355,7 @@ func BenchmarkNodeSelection(b *testing.B) {
}
})

service, err := overlay.NewService(zap.NewNop(), overlaydb, db.NodeEvents(), "", "", overlay.Config{
service, err := overlay.NewService(zap.NewNop(), overlaydb, db.NodeEvents(), overlay.NewPlacementRules().CreateFilters, "", "", overlay.Config{
Node: nodeSelectionConfig,
NodeSelectionCache: overlay.UploadSelectionCacheConfig{
Staleness: time.Hour,
Expand Down

0 comments on commit 97a89c3

Please sign in to comment.