Skip to content

Commit

Permalink
private/testplanet: integrate GC bloom filter service
Browse files Browse the repository at this point in the history
We would like to have separate process/command to collect bloom
filters from source different than production DBs. Such process will
use segment loop to build bloom filters for all storage nodes and
will send it to Storj bucket.
This change adds integration with testplanet which makes writing
unit tests possible.

Updates storj/team-metainfo#120

Change-Id: I7b335c5dafa8cffe265c56b75d8c8f8567580893
  • Loading branch information
mniewrzal authored and Storj Robot committed Sep 2, 2022
1 parent ddc850d commit d905931
Show file tree
Hide file tree
Showing 3 changed files with 63 additions and 4 deletions.
34 changes: 31 additions & 3 deletions private/testplanet/satellite.go
Expand Up @@ -41,6 +41,7 @@ import (
"storj.io/storj/satellite/console/consoleweb"
"storj.io/storj/satellite/contact"
"storj.io/storj/satellite/gc"
"storj.io/storj/satellite/gc/bloomfilter"
"storj.io/storj/satellite/gracefulexit"
"storj.io/storj/satellite/inspector"
"storj.io/storj/satellite/mailservice"
Expand Down Expand Up @@ -70,6 +71,7 @@ type Satellite struct {
Repairer *satellite.Repairer
Admin *satellite.Admin
GC *satellite.GarbageCollection
GCBF *satellite.GarbageCollectionBF

Log *zap.Logger
Identity *identity.FullIdentity
Expand Down Expand Up @@ -134,7 +136,8 @@ type Satellite struct {
}

GarbageCollection struct {
Service *gc.Service
Service *gc.Service
BloomFilters *bloomfilter.Service
}

ExpiredDeletion struct {
Expand Down Expand Up @@ -283,6 +286,7 @@ func (system *Satellite) Close() error {
system.Repairer.Close(),
system.Admin.Close(),
system.GC.Close(),
system.GCBF.Close(),
)
}

Expand All @@ -305,6 +309,9 @@ func (system *Satellite) Run(ctx context.Context) (err error) {
group.Go(func() error {
return errs2.IgnoreCanceled(system.GC.Run(ctx))
})
group.Go(func() error {
return errs2.IgnoreCanceled(system.GCBF.Run(ctx))
})
return group.Wait()
}

Expand Down Expand Up @@ -527,18 +534,23 @@ func (planet *Planet) newSatellite(ctx context.Context, prefix string, index int
return nil, err
}

gcBFPeer, err := planet.newGarbageCollectionBF(ctx, index, identity, db, metabaseDB, config, versionInfo)
if err != nil {
return nil, err
}

if config.EmailReminders.Enable {
peer.Mail.EmailReminders.TestSetLinkAddress("http://" + api.Console.Listener.Addr().String() + "/")
}

return createNewSystem(prefix, log, config, peer, api, repairerPeer, adminPeer, gcPeer), nil
return createNewSystem(prefix, log, config, peer, api, repairerPeer, adminPeer, gcPeer, gcBFPeer), nil
}

// createNewSystem makes a new Satellite System and exposes the same interface from
// before we split out the API. In the short term this will help keep all the tests passing
// without much modification needed. However long term, we probably want to rework this
// so it represents how the satellite will run when it is made up of many processes.
func createNewSystem(name string, log *zap.Logger, config satellite.Config, peer *satellite.Core, api *satellite.API, repairerPeer *satellite.Repairer, adminPeer *satellite.Admin, gcPeer *satellite.GarbageCollection) *Satellite {
func createNewSystem(name string, log *zap.Logger, config satellite.Config, peer *satellite.Core, api *satellite.API, repairerPeer *satellite.Repairer, adminPeer *satellite.Admin, gcPeer *satellite.GarbageCollection, gcBFPeer *satellite.GarbageCollectionBF) *Satellite {
system := &Satellite{
Name: name,
Config: config,
Expand All @@ -547,6 +559,7 @@ func createNewSystem(name string, log *zap.Logger, config satellite.Config, peer
Repairer: repairerPeer,
Admin: adminPeer,
GC: gcPeer,
GCBF: gcBFPeer,
}
system.Log = log
system.Identity = peer.Identity
Expand Down Expand Up @@ -587,6 +600,7 @@ func createNewSystem(name string, log *zap.Logger, config satellite.Config, peer
system.Audit.Reporter = peer.Audit.Reporter

system.GarbageCollection.Service = gcPeer.GarbageCollection.Service
system.GarbageCollection.BloomFilters = gcBFPeer.GarbageCollection.Service

system.ExpiredDeletion.Chore = peer.ExpiredDeletion.Chore
system.ZombieDeletion.Chore = peer.ZombieDeletion.Chore
Expand Down Expand Up @@ -683,6 +697,20 @@ func (planet *Planet) newGarbageCollection(ctx context.Context, index int, ident
return satellite.NewGarbageCollection(log, identity, db, metabaseDB, revocationDB, versionInfo, &config, nil)
}

func (planet *Planet) newGarbageCollectionBF(ctx context.Context, index int, identity *identity.FullIdentity, db satellite.DB, metabaseDB *metabase.DB, config satellite.Config, versionInfo version.Info) (_ *satellite.GarbageCollectionBF, err error) {
defer mon.Task()(&ctx)(&err)

prefix := "satellite-gc-bf" + strconv.Itoa(index)
log := planet.log.Named(prefix)

revocationDB, err := revocation.OpenDBFromCfg(ctx, config.Server.Config)
if err != nil {
return nil, errs.Wrap(err)
}
planet.databases = append(planet.databases, revocationDB)
return satellite.NewGarbageCollectionBF(log, identity, db, metabaseDB, revocationDB, versionInfo, &config, nil)
}

// atLeastOne returns 1 if value < 1, or value otherwise.
func atLeastOne(value int) int {
if value < 1 {
Expand Down
3 changes: 2 additions & 1 deletion satellite/gc/bloomfilter/service.go
Expand Up @@ -21,7 +21,8 @@ var mon = monkit.Package()
// Config contains configurable values for garbage collection.
type Config struct {
Interval time.Duration `help:"the time between each garbage collection executions" releaseDefault:"120h" devDefault:"10m" testDefault:"$TESTINTERVAL"`
Enabled bool `help:"set if garbage collection bloom filters is enabled or not" releaseDefault:"true" devDefault:"true"`
// TODO service is not enabled by default for testing until will be finished
Enabled bool `help:"set if garbage collection bloom filters is enabled or not" default:"true" testDefault:"false"`

// value for InitialPieces currently based on average pieces per node
InitialPieces int `help:"the initial number of pieces expected for a storage node to have, used for creating a filter" releaseDefault:"400000" devDefault:"10"`
Expand Down
30 changes: 30 additions & 0 deletions satellite/gc/bloomfilter/service_test.go
@@ -0,0 +1,30 @@
// Copyright (C) 2022 Storj Labs, Inc.
// See LICENSE for copying information.

package bloomfilter_test

import (
"testing"

"go.uber.org/zap"

"storj.io/common/testcontext"
"storj.io/storj/private/testplanet"
"storj.io/storj/satellite"
)

func TestGarbageCollectionBloomFilters(t *testing.T) {
testplanet.Run(t, testplanet.Config{
SatelliteCount: 1,
Reconfigure: testplanet.Reconfigure{
Satellite: func(log *zap.Logger, index int, config *satellite.Config) {
config.GarbageCollectionBF.Enabled = true
},
},
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
// TODO test will be replaced with something more meaningful when service
// will be fully implemented
planet.Satellites[0].GarbageCollection.BloomFilters.Loop.Pause()
planet.Satellites[0].GarbageCollection.BloomFilters.Loop.TriggerWait()
})
}

0 comments on commit d905931

Please sign in to comment.