Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

prototype: separate repair service from satellite #2836

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
54 changes: 52 additions & 2 deletions cmd/satellite/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,9 +72,15 @@ var (
Args: cobra.MinimumNArgs(3),
RunE: cmdValueAttribution,
}
runRepairCmd = &cobra.Command{
Use: "repair",
Short: "Run the satellite repair process",
RunE: cmdRunRepairProcess,
}

runCfg Satellite
setupCfg Satellite
runCfg Satellite
setupCfg Satellite
runRepairCfg satellite.RepairProcessConfig

qdiagCfg struct {
Database string `help:"satellite database connection string" releaseDefault:"postgres://" devDefault:"sqlite3://$CONFDIR/master.db"`
Expand Down Expand Up @@ -104,7 +110,9 @@ func init() {
rootCmd.AddCommand(reportsCmd)
reportsCmd.AddCommand(nodeUsageCmd)
reportsCmd.AddCommand(partnerAttributionCmd)
runCmd.AddCommand(runRepairCmd)
process.Bind(runCmd, &runCfg, defaults, cfgstruct.ConfDir(confDir), cfgstruct.IdentityDir(identityDir))
process.Bind(runRepairCmd, &runRepairCfg, defaults, cfgstruct.ConfDir(confDir), cfgstruct.IdentityDir(identityDir))
process.Bind(setupCmd, &setupCfg, defaults, cfgstruct.ConfDir(confDir), cfgstruct.IdentityDir(identityDir), cfgstruct.SetupMode())
process.Bind(qdiagCmd, &qdiagCfg, defaults, cfgstruct.ConfDir(confDir), cfgstruct.IdentityDir(identityDir))
process.Bind(nodeUsageCmd, &nodeUsageCfg, defaults, cfgstruct.ConfDir(confDir), cfgstruct.IdentityDir(identityDir))
Expand Down Expand Up @@ -162,6 +170,48 @@ func cmdRun(cmd *cobra.Command, args []string) (err error) {
return errs.Combine(runError, closeError)
}

func cmdRunRepairProcess(cmd *cobra.Command, args []string) (err error) {
ctx := process.Ctx(cmd)
log := zap.L()

identity, err := runCfg.Identity.Load()
if err != nil {
zap.S().Fatal(err)
}

repairDB, err := satellitedb.New(log.Named("db"), runCfg.Database)
if err != nil {
return errs.New("Error starting master database on satellite for repair process: %+v", err)
}
defer func() {
err = errs.Combine(err, repairDB.Close())
}()
err = repairDB.CreateTables()
if err != nil {
return errs.New("Error creating tables for master database on satellite: %+v", err)
}

revDB, err := revocation.NewDBFromCfg(runCfg.Config.Server.Config)
if err != nil {
return errs.New("Error creating revocation database: %+v", err)
}

peer, err := satellite.NewRepairProcess(log, identity, repairDB,
revDB, &runRepairCfg,
)
if err != nil {
return err
}

if err := process.InitMetricsWithCertPath(ctx, log, nil, runCfg.Identity.CertPath); err != nil {
zap.S().Error("Failed to initialize telemetry batcher: ", err)
}

runError := peer.Run(ctx)
closeError := peer.Close()
return errs.Combine(runError, closeError)
}

func cmdSetup(cmd *cobra.Command, args []string) (err error) {
setupDir, err := filepath.Abs(confDir)
if err != nil {
Expand Down
13 changes: 12 additions & 1 deletion internal/testplanet/planet.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ import (
"storj.io/storj/pkg/storj"
"storj.io/storj/satellite"
"storj.io/storj/satellite/overlay"
"storj.io/storj/satellite/repair/checker"
"storj.io/storj/satellite/repair/repairer"
"storj.io/storj/storagenode"
"storj.io/storj/versioncontrol"
)
Expand Down Expand Up @@ -68,7 +70,7 @@ type Planet struct {

Bootstrap *bootstrap.Peer
VersionControl *versioncontrol.Peer
Satellites []*satellite.Peer
Satellites []*SatelliteSystem
StorageNodes []*storagenode.Peer
Uplinks []*Uplink

Expand All @@ -79,6 +81,15 @@ type Planet struct {
cancel func()
}

// SatelliteSystem contains all the processes needed to run a full Satellite setup
type SatelliteSystem struct {
satellite.Peer
Repair struct {
Checker *checker.Checker
Repairer *repairer.Service
}
}

type closablePeer struct {
peer Peer

Expand Down
124 changes: 120 additions & 4 deletions internal/testplanet/satellite.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,8 @@ import (
)

// newSatellites initializes satellites
func (planet *Planet) newSatellites(count int) ([]*satellite.Peer, error) {
var xs []*satellite.Peer
func (planet *Planet) newSatellites(count int) ([]*SatelliteSystem, error) {
var xs []*SatelliteSystem
defer func() {
for _, x := range xs {
planet.peers = append(planet.peers, closablePeer{peer: x})
Expand Down Expand Up @@ -225,9 +225,125 @@ func (planet *Planet) newSatellites(count int) ([]*satellite.Peer, error) {
if err != nil {
return xs, err
}

log.Debug("id=" + peer.ID().String() + " addr=" + peer.Addr())
xs = append(xs, peer)

system := SatelliteSystem{Peer: *peer}
repairProcess, err := planet.newRepairProcess(i, storageDir)
system.Repair.Checker = repairProcess.Repair.Checker
system.Repair.Repairer = repairProcess.Repair.Repairer

xs = append(xs, &system)
}
return xs, nil
}

func (planet *Planet) newRepairProcess(count int, storageDir string) (*satellite.RepairProcess, error) {
prefix := "repairProcess" + strconv.Itoa(count)
log := planet.log.Named(prefix)

identity, err := planet.NewIdentity()
if err != nil {
return nil, err
}

var repairDB satellite.RepairProcessDB
repairDB, err = satellitedb.NewInMemory(log.Named("db"))
if err != nil {
return nil, err
}

err = repairDB.CreateTables()
if err != nil {
return nil, err
}
planet.databases = append(planet.databases, repairDB)

config := satellite.RepairProcessConfig{
Server: server.Config{
Address: "127.0.0.1:0",
PrivateAddress: "127.0.0.1:0",

Config: tlsopts.Config{
RevocationDBURL: "bolt://" + filepath.Join(storageDir, "revocation.db"),
UsePeerCAWhitelist: true,
PeerCAWhitelistPath: planet.whitelistPath,
PeerIDVersions: "latest",
Extensions: extensions.Config{
Revocation: false,
WhitelistSignedLeaf: false,
},
},
},
Overlay: overlay.Config{
Node: overlay.NodeSelectionConfig{
UptimeCount: 0,
AuditCount: 0,
NewNodePercentage: 0,
OnlineWindow: 0,
DistinctIP: false,

AuditReputationRepairWeight: 1,
AuditReputationUplinkWeight: 1,
AuditReputationAlpha0: 1,
AuditReputationBeta0: 0,
AuditReputationLambda: 0.95,
AuditReputationWeight: 1,
AuditReputationDQ: 0.6,
UptimeReputationRepairWeight: 1,
UptimeReputationUplinkWeight: 1,
UptimeReputationAlpha0: 2,
UptimeReputationBeta0: 0,
UptimeReputationLambda: 0.99,
UptimeReputationWeight: 1,
UptimeReputationDQ: 0.6,
},
UpdateStatsBatchSize: 100,
},
Metainfo: metainfo.Config{
DatabaseURL: "bolt://" + filepath.Join(storageDir, "pointers.db"),
MinRemoteSegmentSize: 0, // TODO: fix tests to work with 1024
MaxInlineSegmentSize: 8000,
Overlay: true,
RS: metainfo.RSConfig{
MaxSegmentSize: 64 * memory.MiB,
MaxBufferMem: memory.Size(256),
ErasureShareSize: memory.Size(256),
MinThreshold: (planet.config.StorageNodeCount * 1 / 5),
RepairThreshold: (planet.config.StorageNodeCount * 2 / 5),
SuccessThreshold: (planet.config.StorageNodeCount * 3 / 5),
MaxThreshold: (planet.config.StorageNodeCount * 4 / 5),
Validate: false,
},
Loop: metainfo.LoopConfig{
CoalesceDuration: 5 * time.Second,
},
},
Orders: orders.Config{
Expiration: 7 * 24 * time.Hour,
},
Checker: checker.Config{
Interval: 30 * time.Second,
IrreparableInterval: 15 * time.Second,
ReliabilityCacheStaleness: 5 * time.Minute,
},
Repairer: repairer.Config{
MaxRepair: 10,
Interval: time.Hour,
Timeout: 1 * time.Minute, // Repairs can take up to 10 seconds. Leaving room for outliers
MaxBufferMem: 4 * memory.MiB,
MaxExcessRateOptimalThreshold: 0.05,
},
}
revDB, err := revocation.NewDBFromCfg(config.Server.Config)
if err != nil {
return &satellite.RepairProcess{}, errs.New("Error creating revocation database: %+v", err)
}

repairProcess, err := satellite.NewRepairProcess(log, identity,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We also need to shut down the repair process when testplanet is closed.

repairDB, revDB, &config,
)
if err != nil {
return &satellite.RepairProcess{}, err
}
return repairProcess, nil
}
25 changes: 12 additions & 13 deletions internal/testplanet/uplink.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import (
"storj.io/storj/pkg/peertls/tlsopts"
"storj.io/storj/pkg/storj"
"storj.io/storj/pkg/transport"
"storj.io/storj/satellite"
"storj.io/storj/satellite/console"
"storj.io/storj/uplink"
"storj.io/storj/uplink/metainfo"
Expand Down Expand Up @@ -156,22 +155,22 @@ func (client *Uplink) DialPiecestore(ctx context.Context, destination Peer) (*pi
}

// Upload data to specific satellite
func (client *Uplink) Upload(ctx context.Context, satellite *satellite.Peer, bucket string, path storj.Path, data []byte) error {
func (client *Uplink) Upload(ctx context.Context, satellite *SatelliteSystem, bucket string, path storj.Path, data []byte) error {
return client.UploadWithExpiration(ctx, satellite, bucket, path, data, time.Time{})
}

// UploadWithExpiration data to specific satellite and expiration time
func (client *Uplink) UploadWithExpiration(ctx context.Context, satellite *satellite.Peer, bucket string, path storj.Path, data []byte, expiration time.Time) error {
func (client *Uplink) UploadWithExpiration(ctx context.Context, satellite *SatelliteSystem, bucket string, path storj.Path, data []byte, expiration time.Time) error {
return client.UploadWithExpirationAndConfig(ctx, satellite, nil, bucket, path, data, expiration)
}

// UploadWithConfig uploads data to specific satellite with configured values
func (client *Uplink) UploadWithConfig(ctx context.Context, satellite *satellite.Peer, redundancy *uplink.RSConfig, bucket string, path storj.Path, data []byte) error {
func (client *Uplink) UploadWithConfig(ctx context.Context, satellite *SatelliteSystem, redundancy *uplink.RSConfig, bucket string, path storj.Path, data []byte) error {
return client.UploadWithExpirationAndConfig(ctx, satellite, redundancy, bucket, path, data, time.Time{})
}

// UploadWithExpirationAndConfig uploads data to specific satellite with configured values and expiration time
func (client *Uplink) UploadWithExpirationAndConfig(ctx context.Context, satellite *satellite.Peer, redundancy *uplink.RSConfig, bucketName string, path storj.Path, data []byte, expiration time.Time) (err error) {
func (client *Uplink) UploadWithExpirationAndConfig(ctx context.Context, satellite *SatelliteSystem, redundancy *uplink.RSConfig, bucketName string, path storj.Path, data []byte, expiration time.Time) (err error) {
config := client.GetConfig(satellite)
if redundancy != nil {
if redundancy.MinThreshold > 0 {
Expand Down Expand Up @@ -211,7 +210,7 @@ func (client *Uplink) UploadWithExpirationAndConfig(ctx context.Context, satelli
}

// Download data from specific satellite
func (client *Uplink) Download(ctx context.Context, satellite *satellite.Peer, bucketName string, path storj.Path) ([]byte, error) {
func (client *Uplink) Download(ctx context.Context, satellite *SatelliteSystem, bucketName string, path storj.Path) ([]byte, error) {
project, bucket, err := client.GetProjectAndBucket(ctx, satellite, bucketName, client.GetConfig(satellite))
if err != nil {
return nil, err
Expand All @@ -237,7 +236,7 @@ func (client *Uplink) Download(ctx context.Context, satellite *satellite.Peer, b
}

// DownloadStream returns stream for downloading data
func (client *Uplink) DownloadStream(ctx context.Context, satellite *satellite.Peer, bucketName string, path storj.Path) (_ io.ReadCloser, cleanup func() error, err error) {
func (client *Uplink) DownloadStream(ctx context.Context, satellite *SatelliteSystem, bucketName string, path storj.Path) (_ io.ReadCloser, cleanup func() error, err error) {
project, bucket, err := client.GetProjectAndBucket(ctx, satellite, bucketName, client.GetConfig(satellite))
if err != nil {
return nil, nil, err
Expand All @@ -256,7 +255,7 @@ func (client *Uplink) DownloadStream(ctx context.Context, satellite *satellite.P
}

// DownloadStreamRange returns stream for downloading data
func (client *Uplink) DownloadStreamRange(ctx context.Context, satellite *satellite.Peer, bucketName string, path storj.Path, start, limit int64) (_ io.ReadCloser, cleanup func() error, err error) {
func (client *Uplink) DownloadStreamRange(ctx context.Context, satellite *SatelliteSystem, bucketName string, path storj.Path, start, limit int64) (_ io.ReadCloser, cleanup func() error, err error) {
project, bucket, err := client.GetProjectAndBucket(ctx, satellite, bucketName, client.GetConfig(satellite))
if err != nil {
return nil, nil, err
Expand All @@ -275,7 +274,7 @@ func (client *Uplink) DownloadStreamRange(ctx context.Context, satellite *satell
}

// Delete deletes an object at the path in a bucket
func (client *Uplink) Delete(ctx context.Context, satellite *satellite.Peer, bucketName string, path storj.Path) error {
func (client *Uplink) Delete(ctx context.Context, satellite *SatelliteSystem, bucketName string, path storj.Path) error {
project, bucket, err := client.GetProjectAndBucket(ctx, satellite, bucketName, client.GetConfig(satellite))
if err != nil {
return err
Expand All @@ -290,7 +289,7 @@ func (client *Uplink) Delete(ctx context.Context, satellite *satellite.Peer, buc
}

// CreateBucket creates a new bucket
func (client *Uplink) CreateBucket(ctx context.Context, satellite *satellite.Peer, bucketName string) error {
func (client *Uplink) CreateBucket(ctx context.Context, satellite *SatelliteSystem, bucketName string) error {
project, err := client.GetProject(ctx, satellite)
if err != nil {
return err
Expand All @@ -312,7 +311,7 @@ func (client *Uplink) CreateBucket(ctx context.Context, satellite *satellite.Pee
}

// GetConfig returns a default config for a given satellite.
func (client *Uplink) GetConfig(satellite *satellite.Peer) uplink.Config {
func (client *Uplink) GetConfig(satellite *SatelliteSystem) uplink.Config {
config := getDefaultConfig()

apiKey, err := libuplink.ParseAPIKey(client.APIKey[satellite.ID()])
Expand Down Expand Up @@ -384,7 +383,7 @@ func (client *Uplink) NewLibuplink(ctx context.Context) (*libuplink.Uplink, erro
}

// GetProject returns a libuplink.Project which allows interactions with a specific project
func (client *Uplink) GetProject(ctx context.Context, satellite *satellite.Peer) (*libuplink.Project, error) {
func (client *Uplink) GetProject(ctx context.Context, satellite *SatelliteSystem) (*libuplink.Project, error) {
testLibuplink, err := client.NewLibuplink(ctx)
if err != nil {
return nil, err
Expand All @@ -404,7 +403,7 @@ func (client *Uplink) GetProject(ctx context.Context, satellite *satellite.Peer)
}

// GetProjectAndBucket returns a libuplink.Project and Bucket which allows interactions with a specific project and its buckets
func (client *Uplink) GetProjectAndBucket(ctx context.Context, satellite *satellite.Peer, bucketName string, clientCfg uplink.Config) (_ *libuplink.Project, _ *libuplink.Bucket, err error) {
func (client *Uplink) GetProjectAndBucket(ctx context.Context, satellite *SatelliteSystem, bucketName string, clientCfg uplink.Config) (_ *libuplink.Project, _ *libuplink.Bucket, err error) {
project, err := client.GetProject(ctx, satellite)
if err != nil {
return nil, nil, err
Expand Down
5 changes: 2 additions & 3 deletions pkg/kademlia/merge_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ import (
"storj.io/storj/bootstrap"
"storj.io/storj/internal/testcontext"
"storj.io/storj/internal/testplanet"
"storj.io/storj/satellite"
"storj.io/storj/storagenode"
)

Expand Down Expand Up @@ -53,7 +52,7 @@ func TestMergePlanets(t *testing.T) {
alpha.Start(ctx)
beta.Start(ctx)

allSatellites := []*satellite.Peer{}
allSatellites := []*testplanet.SatelliteSystem{}
allSatellites = append(allSatellites, alpha.Satellites...)
allSatellites = append(allSatellites, beta.Satellites...)

Expand All @@ -71,7 +70,7 @@ func TestMergePlanets(t *testing.T) {
}
_ = group.Wait()

test := func(tag string, satellites []*satellite.Peer, storageNodes []*storagenode.Peer) string {
test := func(tag string, satellites []*testplanet.SatelliteSystem, storageNodes []*storagenode.Peer) string {
found, missing := 0, 0
for _, satellite := range satellites {
for _, storageNode := range storageNodes {
Expand Down
4 changes: 2 additions & 2 deletions satellite/audit/disqualification_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -234,13 +234,13 @@ func TestDisqualifiedNodeRemainsDisqualified(t *testing.T) {
})
}

func isDisqualified(t *testing.T, ctx *testcontext.Context, satellite *satellite.Peer, nodeID storj.NodeID) bool {
func isDisqualified(t *testing.T, ctx *testcontext.Context, satellite *testplanet.SatelliteSystem, nodeID storj.NodeID) bool {
node, err := satellite.Overlay.Service.Get(ctx, nodeID)
require.NoError(t, err)

return node.Disqualified != nil
}
func disqualifyNode(t *testing.T, ctx *testcontext.Context, satellite *satellite.Peer, nodeID storj.NodeID) {
func disqualifyNode(t *testing.T, ctx *testcontext.Context, satellite *testplanet.SatelliteSystem, nodeID storj.NodeID) {
_, err := satellite.DB.OverlayCache().BatchUpdateStats(ctx, []*overlay.UpdateRequest{{
NodeID: nodeID,
IsUp: true,
Expand Down