Skip to content

Commit

Permalink
satellite/audit: newContainment->containment
Browse files Browse the repository at this point in the history
Now that all the reverification changes have been made and the old code
is out of the way, this commit renames the new things back to the old
names. Mostly, this involves renaming "newContainment" to "containment"
or "NewContainment" to "Containment", but there are a few other renames
that have been promised and are carried out here.

Refs: #5230
Change-Id: I34e2b857ea338acbb8421cdac18b17f2974f233c
  • Loading branch information
thepaul authored and Storj Robot committed Dec 16, 2022
1 parent 99206fc commit fc905a1
Show file tree
Hide file tree
Showing 16 changed files with 98 additions and 109 deletions.
2 changes: 1 addition & 1 deletion cmd/satellite/fetchpieces.go
Expand Up @@ -91,7 +91,7 @@ func cmdFetchPieces(cmd *cobra.Command, args []string) (err error) {
db.OverlayCache(),
db.NodeEvents(),
db.Reputation(),
db.NewContainment(),
db.Containment(),
rollupsWriteCache,
version.Build,
&runCfg.Config,
Expand Down
2 changes: 1 addition & 1 deletion cmd/satellite/repairer.go
Expand Up @@ -71,7 +71,7 @@ func cmdRepairerRun(cmd *cobra.Command, args []string) (err error) {
db.OverlayCache(),
db.NodeEvents(),
db.Reputation(),
db.NewContainment(),
db.Containment(),
rollupsWriteCache,
version.Build,
&runCfg.Config,
Expand Down
2 changes: 1 addition & 1 deletion private/testplanet/satellite.go
Expand Up @@ -700,7 +700,7 @@ func (planet *Planet) newRepairer(ctx context.Context, index int, identity *iden
rollupsWriteCache := orders.NewRollupsWriteCache(log.Named("orders-write-cache"), db.Orders(), config.Orders.FlushBatchSize)
planet.databases = append(planet.databases, rollupsWriteCacheCloser{rollupsWriteCache})

return satellite.NewRepairer(log, identity, metabaseDB, revocationDB, db.RepairQueue(), db.Buckets(), db.OverlayCache(), db.NodeEvents(), db.Reputation(), db.NewContainment(), rollupsWriteCache, versionInfo, &config, nil)
return satellite.NewRepairer(log, identity, metabaseDB, revocationDB, db.RepairQueue(), db.Buckets(), db.OverlayCache(), db.NodeEvents(), db.Reputation(), db.Containment(), rollupsWriteCache, versionInfo, &config, nil)
}

type rollupsWriteCacheCloser struct {
Expand Down
10 changes: 2 additions & 8 deletions satellite/audit/containment.go
Expand Up @@ -22,16 +22,10 @@ var (
ErrContainDelete = errs.Class("unable to delete pending audit")
)

// NewContainment holds information about pending audits for contained nodes.
//
// It will exist side by side with Containment for a few commits in this
// commit chain, to allow the change in reverifications to be made over
// several smaller commits.
//
// Later in the commit chain, NewContainment will be renamed to Containment.
// Containment holds information about pending audits for contained nodes.
//
// architecture: Database
type NewContainment interface {
type Containment interface {
Get(ctx context.Context, nodeID pb.NodeID) (*ReverificationJob, error)
Insert(ctx context.Context, job *PieceLocator) error
Delete(ctx context.Context, job *PieceLocator) (wasDeleted, nodeStillContained bool, err error)
Expand Down
Expand Up @@ -20,11 +20,11 @@ import (
"storj.io/storj/satellite/reputation"
)

func TestNewContainInsertAndGet(t *testing.T) {
func TestContainInsertAndGet(t *testing.T) {
testplanet.Run(t, testplanet.Config{
SatelliteCount: 1, StorageNodeCount: 2,
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
containment := planet.Satellites[0].DB.NewContainment()
containment := planet.Satellites[0].DB.Containment()

input := &audit.PieceLocator{
StreamID: testrand.UUID(),
Expand All @@ -49,11 +49,11 @@ func TestNewContainInsertAndGet(t *testing.T) {
})
}

func TestNewContainIncrementPendingEntryExists(t *testing.T) {
func TestContainIncrementPendingEntryExists(t *testing.T) {
testplanet.Run(t, testplanet.Config{
SatelliteCount: 1, StorageNodeCount: 1,
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
containment := planet.Satellites[0].DB.NewContainment()
containment := planet.Satellites[0].DB.Containment()

info1 := &audit.PieceLocator{
NodeID: planet.StorageNodes[0].ID(),
Expand Down Expand Up @@ -86,11 +86,11 @@ func TestNewContainIncrementPendingEntryExists(t *testing.T) {
})
}

func TestNewContainDelete(t *testing.T) {
func TestContainDelete(t *testing.T) {
testplanet.Run(t, testplanet.Config{
SatelliteCount: 1, StorageNodeCount: 1,
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
containment := planet.Satellites[0].DB.NewContainment()
containment := planet.Satellites[0].DB.Containment()

// add two reverification jobs for the same node
info1 := &audit.PieceLocator{
Expand Down Expand Up @@ -147,11 +147,11 @@ func TestNewContainDelete(t *testing.T) {

// UpdateStats used to remove nodes from containment. It doesn't anymore.
// This is a sanity check.
func TestNewContainUpdateStats(t *testing.T) {
func TestContainUpdateStats(t *testing.T) {
testplanet.Run(t, testplanet.Config{
SatelliteCount: 1, StorageNodeCount: 1,
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
containment := planet.Satellites[0].DB.NewContainment()
containment := planet.Satellites[0].DB.Containment()
cache := planet.Satellites[0].DB.OverlayCache()

info1 := &audit.PieceLocator{
Expand Down
2 changes: 1 addition & 1 deletion satellite/audit/getshare_test.go
Expand Up @@ -61,7 +61,7 @@ func reformVerifierWithMockConnector(t testing.TB, sat *testplanet.Satellite, mo
sat.Metabase.DB,
newDialer,
sat.Overlay.Service,
sat.DB.NewContainment(),
sat.DB.Containment(),
sat.Orders.Service,
sat.Identity,
sat.Config.Audit.MinBytesPerSecond,
Expand Down
39 changes: 18 additions & 21 deletions satellite/audit/reporter.go
Expand Up @@ -19,11 +19,10 @@ import (
//
// architecture: Service
type reporter struct {
log *zap.Logger
reputations *reputation.Service
overlay *overlay.Service
// newContainment will be renamed to containment.
newContainment NewContainment
log *zap.Logger
reputations *reputation.Service
overlay *overlay.Service
containment Containment
maxRetries int
maxReverifyCount int32
}
Expand All @@ -41,22 +40,21 @@ type Reporter interface {
// succeeded, failed, were offline, have pending audits, or failed for unknown
// reasons and their current reputation status.
type Report struct {
Successes storj.NodeIDList
Fails storj.NodeIDList
Offlines storj.NodeIDList
// PieceAudits will be renamed to PendingAudits.
PieceAudits []*ReverificationJob
Successes storj.NodeIDList
Fails storj.NodeIDList
Offlines storj.NodeIDList
PendingAudits []*ReverificationJob
Unknown storj.NodeIDList
NodesReputation map[storj.NodeID]overlay.ReputationStatus
}

// NewReporter instantiates a reporter.
func NewReporter(log *zap.Logger, reputations *reputation.Service, overlay *overlay.Service, newContainment NewContainment, maxRetries int, maxReverifyCount int32) Reporter {
func NewReporter(log *zap.Logger, reputations *reputation.Service, overlay *overlay.Service, containment Containment, maxRetries int, maxReverifyCount int32) Reporter {
return &reporter{
log: log,
reputations: reputations,
overlay: overlay,
newContainment: newContainment,
containment: containment,
maxRetries: maxRetries,
maxReverifyCount: maxReverifyCount,
}
Expand All @@ -72,7 +70,7 @@ func (reporter *reporter) RecordAudits(ctx context.Context, req Report) {
fails := req.Fails
unknowns := req.Unknown
offlines := req.Offlines
pendingAudits := req.PieceAudits
pendingAudits := req.PendingAudits

reporter.log.Debug("Reporting audits",
zap.Int("successes", len(successes)),
Expand Down Expand Up @@ -110,7 +108,7 @@ func (reporter *reporter) RecordAudits(ctx context.Context, req Report) {
reportFailures(tries, "unknown", err, unknowns, nil)
offlines, err = reporter.recordAuditStatus(ctx, offlines, nodesReputation, reputation.AuditOffline)
reportFailures(tries, "offline", err, offlines, nil)
pendingAudits, err = reporter.recordPendingPieceAudits(ctx, pendingAudits, nodesReputation)
pendingAudits, err = reporter.recordPendingAudits(ctx, pendingAudits, nodesReputation)
reportFailures(tries, "pending", err, nil, pendingAudits)
}
}
Expand All @@ -132,9 +130,8 @@ func (reporter *reporter) recordAuditStatus(ctx context.Context, nodeIDs storj.N
return failed, errors.Err()
}

// recordPendingPieceAudits updates the containment status of nodes with pending piece audits.
// This function is temporary and will be renamed to recordPendingAudits later in this commit chain.
func (reporter *reporter) recordPendingPieceAudits(ctx context.Context, pendingAudits []*ReverificationJob, nodesReputation map[storj.NodeID]overlay.ReputationStatus) (failed []*ReverificationJob, err error) {
// recordPendingAudits updates the containment status of nodes with pending piece audits.
func (reporter *reporter) recordPendingAudits(ctx context.Context, pendingAudits []*ReverificationJob, nodesReputation map[storj.NodeID]overlay.ReputationStatus) (failed []*ReverificationJob, err error) {
defer mon.Task()(&ctx)(&err)
var errlist errs.Group

Expand Down Expand Up @@ -164,7 +161,7 @@ func (reporter *reporter) recordPendingPieceAudits(ctx context.Context, pendingA
failed = append(failed, pendingAudit)
continue
}
_, stillContained, err := reporter.newContainment.Delete(ctx, &pendingAudit.Locator)
_, stillContained, err := reporter.containment.Delete(ctx, &pendingAudit.Locator)
if err != nil {
if !ErrContainedNotFound.Has(err) {
errlist.Add(err)
Expand All @@ -188,7 +185,7 @@ func (reporter *reporter) recordPendingPieceAudits(ctx context.Context, pendingA
func (reporter *reporter) ReportReverificationNeeded(ctx context.Context, piece *PieceLocator) (err error) {
defer mon.Task()(&ctx)(&err)

err = reporter.newContainment.Insert(ctx, piece)
err = reporter.containment.Insert(ctx, piece)
if err != nil {
return Error.New("failed to queue reverification audit for node: %w", err)
}
Expand Down Expand Up @@ -223,7 +220,7 @@ func (reporter *reporter) RecordReverificationResult(ctx context.Context, pendin
// This will get re-added to the reverification queue, but that is idempotent
// and fine. We do need to add it to PendingAudits in order to get the
// maxReverifyCount check.
report.PieceAudits = append(report.PieceAudits, pendingJob)
report.PendingAudits = append(report.PendingAudits, pendingJob)
case OutcomeUnknownError:
report.Unknown = append(report.Unknown, pendingJob.Locator.NodeID)
keepInQueue = false
Expand All @@ -237,7 +234,7 @@ func (reporter *reporter) RecordReverificationResult(ctx context.Context, pendin

// remove from reverifications queue if appropriate
if !keepInQueue {
_, stillContained, err := reporter.newContainment.Delete(ctx, &pendingJob.Locator)
_, stillContained, err := reporter.containment.Delete(ctx, &pendingJob.Locator)
if err != nil {
if !ErrContainedNotFound.Has(err) {
errList.Add(err)
Expand Down
16 changes: 8 additions & 8 deletions satellite/audit/reporter_test.go
Expand Up @@ -35,8 +35,8 @@ func TestReportPendingAudits(t *testing.T) {
},
}

report := audit.Report{PieceAudits: []*audit.ReverificationJob{&pending}}
containment := satellite.DB.NewContainment()
report := audit.Report{PendingAudits: []*audit.ReverificationJob{&pending}}
containment := satellite.DB.Containment()

audits.Reporter.RecordAudits(ctx, report)

Expand Down Expand Up @@ -94,7 +94,7 @@ func TestRecordAuditsCorrectOutcome(t *testing.T) {
Successes: []storj.NodeID{goodNode},
Fails: []storj.NodeID{dqNode},
Unknown: []storj.NodeID{suspendedNode},
PieceAudits: []*audit.ReverificationJob{
PendingAudits: []*audit.ReverificationJob{
{
Locator: audit.PieceLocator{NodeID: pendingNode},
ReverifyCount: 0,
Expand Down Expand Up @@ -206,11 +206,11 @@ func TestGracefullyExitedNotUpdated(t *testing.T) {
},
}
report = audit.Report{
Successes: storj.NodeIDList{successNode.ID()},
Fails: storj.NodeIDList{failedNode.ID()},
Offlines: storj.NodeIDList{offlineNode.ID()},
PieceAudits: []*audit.ReverificationJob{&pending},
Unknown: storj.NodeIDList{unknownNode.ID()},
Successes: storj.NodeIDList{successNode.ID()},
Fails: storj.NodeIDList{failedNode.ID()},
Offlines: storj.NodeIDList{offlineNode.ID()},
PendingAudits: []*audit.ReverificationJob{&pending},
Unknown: storj.NodeIDList{unknownNode.ID()},
}
audits.Reporter.RecordAudits(ctx, report)

Expand Down
24 changes: 12 additions & 12 deletions satellite/audit/reverify_test.go
Expand Up @@ -62,7 +62,7 @@ func TestReverifySuccess(t *testing.T) {
pieceIndex := testrand.Intn(len(segment.Pieces))
piece := segment.Pieces[pieceIndex]

containment := satellite.DB.NewContainment()
containment := satellite.DB.Containment()

pending := &audit.PieceLocator{
NodeID: piece.StorageNode,
Expand Down Expand Up @@ -120,7 +120,7 @@ func TestReverifyFailMissingShare(t *testing.T) {
})
require.NoError(t, err)

containment := satellite.DB.NewContainment()
containment := satellite.DB.Containment()

pieceIndex := testrand.Intn(len(segment.Pieces))
piece := segment.Pieces[pieceIndex]
Expand Down Expand Up @@ -212,7 +212,7 @@ func TestReverifyOffline(t *testing.T) {
require.NoError(t, err)

// make sure that pending audit is not removed
containment := satellite.DB.NewContainment()
containment := satellite.DB.Containment()
_, err = containment.Get(ctx, pending.NodeID)
require.NoError(t, err)
})
Expand Down Expand Up @@ -273,7 +273,7 @@ func TestReverifyOfflineDialTimeout(t *testing.T) {
satellite.Metabase.DB,
dialer,
satellite.Overlay.Service,
satellite.DB.NewContainment(),
satellite.DB.Containment(),
satellite.Orders.Service,
satellite.Identity,
minBytesPerSecond,
Expand Down Expand Up @@ -304,7 +304,7 @@ func TestReverifyOfflineDialTimeout(t *testing.T) {
require.NoError(t, err)

// make sure that pending audit is not removed
containment := satellite.DB.NewContainment()
containment := satellite.DB.Containment()
_, err = containment.Get(ctx, pending.NodeID)
require.NoError(t, err)
})
Expand Down Expand Up @@ -371,7 +371,7 @@ func TestReverifyDeletedSegment(t *testing.T) {
require.NoError(t, err)

// expect that the node was removed from containment since the segment it was contained for has been deleted
containment := satellite.DB.NewContainment()
containment := satellite.DB.Containment()
_, err = containment.Get(ctx, piece.StorageNode)
require.True(t, audit.ErrContainedNotFound.Has(err))
})
Expand Down Expand Up @@ -433,7 +433,7 @@ func TestReverifyModifiedSegment(t *testing.T) {
PieceNum: int(piece.Number),
}

containment := satellite.DB.NewContainment()
containment := satellite.DB.Containment()

err = audits.Reporter.ReportReverificationNeeded(ctx, pending)
require.NoError(t, err)
Expand Down Expand Up @@ -508,7 +508,7 @@ func TestReverifyReplacedSegment(t *testing.T) {
PieceNum: int(piece.Number),
}

containment := satellite.DB.NewContainment()
containment := satellite.DB.Containment()

err = audits.Reporter.ReportReverificationNeeded(ctx, pending)
require.NoError(t, err)
Expand Down Expand Up @@ -582,7 +582,7 @@ func TestReverifyExpired(t *testing.T) {

// expect that the node was removed from containment since the segment it was
// contained for has expired
_, err = satellite.DB.NewContainment().Get(ctx, piece.StorageNode)
_, err = satellite.DB.Containment().Get(ctx, piece.StorageNode)
require.True(t, audit.ErrContainedNotFound.Has(err))
})
}
Expand Down Expand Up @@ -631,7 +631,7 @@ func TestReverifySlowDownload(t *testing.T) {

slowPiece := segment.Pieces[0]
slowNode := slowPiece.StorageNode
containment := satellite.DB.NewContainment()
containment := satellite.DB.Containment()

pending := &audit.PieceLocator{
NodeID: slowNode,
Expand Down Expand Up @@ -696,7 +696,7 @@ func TestReverifyUnknownError(t *testing.T) {

badPiece := segment.Pieces[0]
badNode := badPiece.StorageNode
containment := satellite.DB.NewContainment()
containment := satellite.DB.Containment()

pending := &audit.PieceLocator{
NodeID: badNode,
Expand Down Expand Up @@ -768,7 +768,7 @@ func TestMaxReverifyCount(t *testing.T) {

slowPiece := segment.Pieces[0]
slowNode := slowPiece.StorageNode
containment := satellite.DB.NewContainment()
containment := satellite.DB.Containment()

pending := &audit.PieceLocator{
NodeID: slowNode,
Expand Down

0 comments on commit fc905a1

Please sign in to comment.