Skip to content

Commit

Permalink
satellite/audit: add audit.ReverifyWorker
Browse files Browse the repository at this point in the history
Here we add a worker class comparable to audit.Worker, which will be
responsible for pulling items off of the reverification queue and
calling reverifier.ReverifyPiece on them.

Note that piecewise reverification audits (which this will control) are
not yet being done. That is, nothing is being added to the
reverification queue at this point.

Refs: #5251
Change-Id: I94e28830e27caa49f2c8bd4a2336533e187ab69c
  • Loading branch information
thepaul authored and Storj Robot committed Dec 12, 2022
1 parent 1854351 commit c575a21
Show file tree
Hide file tree
Showing 4 changed files with 160 additions and 28 deletions.
16 changes: 10 additions & 6 deletions private/testplanet/satellite.go
Expand Up @@ -133,12 +133,14 @@ type Satellite struct {
}

Audit struct {
VerifyQueue audit.VerifyQueue
Worker *audit.Worker
Chore *audit.Chore
Verifier *audit.Verifier
Reverifier *audit.Reverifier
Reporter audit.Reporter
VerifyQueue audit.VerifyQueue
ReverifyQueue audit.ReverifyQueue
Worker *audit.Worker
ReverifyWorker *audit.ReverifyWorker
Chore *audit.Chore
Verifier *audit.Verifier
Reverifier *audit.Reverifier
Reporter audit.Reporter
}

Reputation struct {
Expand Down Expand Up @@ -610,7 +612,9 @@ func createNewSystem(name string, log *zap.Logger, config satellite.Config, peer
system.Repair.Repairer = repairerPeer.Repairer

system.Audit.VerifyQueue = peer.Audit.VerifyQueue
system.Audit.ReverifyQueue = peer.Audit.ReverifyQueue
system.Audit.Worker = peer.Audit.Worker
system.Audit.ReverifyWorker = peer.Audit.ReverifyWorker
system.Audit.Chore = peer.Audit.Chore
system.Audit.Verifier = peer.Audit.Verifier
system.Audit.Reverifier = peer.Audit.Reverifier
Expand Down
114 changes: 114 additions & 0 deletions satellite/audit/reverifyworker.go
@@ -0,0 +1,114 @@
// Copyright (C) 2022 Storj Labs, Inc.
// See LICENSE for copying information.

package audit

import (
"context"
"time"

"go.uber.org/zap"

"storj.io/common/sync2"
)

// ReverifyWorker processes reverifications (retrying piece audits against nodes that timed out
// during a Verification).
type ReverifyWorker struct {
log *zap.Logger
queue ReverifyQueue
reverifier *Reverifier
reporter Reporter

Loop *sync2.Cycle
concurrency int
retryInterval time.Duration
}

// NewReverifyWorker creates a new ReverifyWorker.
func NewReverifyWorker(log *zap.Logger, queue ReverifyQueue, reverifier *Reverifier, reporter Reporter, config Config) *ReverifyWorker {
return &ReverifyWorker{
log: log,
queue: queue,
reverifier: reverifier,
reporter: reporter,
Loop: sync2.NewCycle(config.QueueInterval),
concurrency: config.ReverifyWorkerConcurrency,
retryInterval: config.ReverificationRetryInterval,
}
}

// Run runs a ReverifyWorker.
func (worker *ReverifyWorker) Run(ctx context.Context) (err error) {
defer mon.Task()(&ctx)(&err)

return worker.Loop.Run(ctx, func(ctx context.Context) (err error) {
err = worker.process(ctx)
if err != nil {
worker.log.Error("failure processing reverify queue", zap.Error(Error.Wrap(err)))
}
return nil
})
}

func (worker *ReverifyWorker) process(ctx context.Context) (err error) {
defer mon.Task()(&ctx)(&err)

limiter := sync2.NewLimiter(worker.concurrency)
defer limiter.Wait()

for {
// We start the timeout clock _before_ pulling the next job from
// the queue. This gives us the best chance of having this worker
// terminate and get cleaned up before another reverification
// worker tries to take over the job.
//
// (If another worker does take over the job before this worker
// has been cleaned up, it is ok; the downside should only be
// duplication of work and monkit stats.)
ctx, cancel := context.WithTimeout(ctx, worker.retryInterval)

reverifyJob, err := worker.queue.GetNextJob(ctx, worker.retryInterval)
if err != nil {
cancel()
if ErrEmptyQueue.Has(err) {
return nil
}
return err
}

started := limiter.Go(ctx, func() {
defer cancel()

logger := worker.log.With(
zap.Stringer("Segment StreamID", reverifyJob.Locator.StreamID),
zap.Uint64("Segment Position", reverifyJob.Locator.Position.Encode()),
zap.Stringer("Node ID", reverifyJob.Locator.NodeID),
zap.Int("Piece Number", reverifyJob.Locator.PieceNum))
worker.work(ctx, logger, reverifyJob)
})
if !started {
cancel()
return ctx.Err()
}
}
}

func (worker *ReverifyWorker) work(ctx context.Context, logger *zap.Logger, job *ReverificationJob) {
defer mon.Task()(&ctx)(nil)

logger.Debug("beginning piecewise audit")
outcome, reputation := worker.reverifier.ReverifyPiece(ctx, logger, &job.Locator)
logger.Debug("piecewise audit complete", zap.Int("outcome", int(outcome)))

err := worker.reporter.RecordReverificationResult(ctx, job, outcome, reputation)
if err != nil {
logger.Error("finished with audit, but failed to remove entry from queue", zap.Error(err))
}
}

// Close halts the worker.
func (worker *ReverifyWorker) Close() error {
worker.Loop.Close()
return nil
}
26 changes: 14 additions & 12 deletions satellite/audit/worker.go
Expand Up @@ -38,24 +38,26 @@ type Config struct {

// Worker contains information for populating audit queue and processing audits.
type Worker struct {
log *zap.Logger
queue VerifyQueue
verifier *Verifier
reporter Reporter
Loop *sync2.Cycle
concurrency int
log *zap.Logger
queue VerifyQueue
verifier *Verifier
reverifyQueue ReverifyQueue
reporter Reporter
Loop *sync2.Cycle
concurrency int
}

// NewWorker instantiates Worker.
func NewWorker(log *zap.Logger, queue VerifyQueue, verifier *Verifier, reporter Reporter, config Config) *Worker {
func NewWorker(log *zap.Logger, queue VerifyQueue, verifier *Verifier, reverifyQueue ReverifyQueue, reporter Reporter, config Config) *Worker {
return &Worker{
log: log,

queue: queue,
verifier: verifier,
reporter: reporter,
Loop: sync2.NewCycle(config.QueueInterval),
concurrency: config.WorkerConcurrency,
queue: queue,
verifier: verifier,
reverifyQueue: reverifyQueue,
reporter: reporter,
Loop: sync2.NewCycle(config.QueueInterval),
concurrency: config.WorkerConcurrency,
}
}

Expand Down
32 changes: 22 additions & 10 deletions satellite/core.go
Expand Up @@ -116,13 +116,14 @@ type Core struct {
}

Audit struct {
VerifyQueue audit.VerifyQueue
ReverifyQueue audit.ReverifyQueue
Worker *audit.Worker
Chore *audit.Chore
Verifier *audit.Verifier
Reverifier *audit.Reverifier
Reporter audit.Reporter
VerifyQueue audit.VerifyQueue
ReverifyQueue audit.ReverifyQueue
Worker *audit.Worker
ReverifyWorker *audit.ReverifyWorker
Chore *audit.Chore
Verifier *audit.Verifier
Reverifier *audit.Reverifier
Reporter audit.Reporter
}

ExpiredDeletion struct {
Expand Down Expand Up @@ -408,6 +409,7 @@ func New(log *zap.Logger, full *identity.FullIdentity, db DB,
config := config.Audit

peer.Audit.VerifyQueue = db.VerifyQueue()
peer.Audit.ReverifyQueue = db.ReverifyQueue()

peer.Audit.Verifier = audit.NewVerifier(log.Named("audit:verifier"),
peer.Metainfo.Metabase,
Expand Down Expand Up @@ -437,6 +439,7 @@ func New(log *zap.Logger, full *identity.FullIdentity, db DB,
peer.Audit.Worker = audit.NewWorker(peer.Log.Named("audit:worker"),
peer.Audit.VerifyQueue,
peer.Audit.Verifier,
peer.Audit.ReverifyQueue,
peer.Audit.Reporter,
config,
)
Expand All @@ -448,9 +451,18 @@ func New(log *zap.Logger, full *identity.FullIdentity, db DB,
peer.Debug.Server.Panel.Add(
debug.Cycle("Audit Worker", peer.Audit.Worker.Loop))

if err != nil {
return nil, errs.Combine(err, peer.Close())
}
peer.Audit.ReverifyWorker = audit.NewReverifyWorker(peer.Log.Named("audit:reverify-worker"),
peer.Audit.ReverifyQueue,
peer.Audit.Reverifier,
peer.Audit.Reporter,
config)
peer.Services.Add(lifecycle.Item{
Name: "audit:reverify-worker",
Run: peer.Audit.ReverifyWorker.Run,
Close: peer.Audit.ReverifyWorker.Close,
})
peer.Debug.Server.Panel.Add(
debug.Cycle("Audit Reverify Worker", peer.Audit.ReverifyWorker.Loop))

peer.Audit.Chore = audit.NewChore(peer.Log.Named("audit:chore"),
peer.Audit.VerifyQueue,
Expand Down

0 comments on commit c575a21

Please sign in to comment.