Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix/test maybe compact single epoch #547

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion internal/epoch/epoch_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -838,7 +838,7 @@ func TestMaybeCompactSingleEpoch_CompactionError(t *testing.T) {

idxCount := p.GetEpochAdvanceOnCountThreshold()
// Create sufficient indexes blobs and move clock forward to advance epoch.
for j := 0; j < 3; j++ {
for j := 0; j < 4; j++ {
for i := 0; i < idxCount; i++ {
if i == idxCount-1 {
// Advance the time so that the difference in times for writes will force
Expand All @@ -848,6 +848,8 @@ func TestMaybeCompactSingleEpoch_CompactionError(t *testing.T) {

te.mustWriteIndexFiles(ctx, t, newFakeIndexWithEntries(i))
}

require.NoError(t, te.mgr.MaybeAdvanceWriteEpoch(ctx))
}

compactionError := errors.New("test compaction error")
Expand Down
96 changes: 82 additions & 14 deletions repo/maintenance/maintenance_run.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/pkg/errors"

"github.com/kopia/kopia/internal/clock"
"github.com/kopia/kopia/internal/epoch"
"github.com/kopia/kopia/repo"
"github.com/kopia/kopia/repo/content"
"github.com/kopia/kopia/repo/content/index"
Expand All @@ -36,16 +37,20 @@ type TaskType string

// Task IDs.
const (
TaskSnapshotGarbageCollection = "snapshot-gc"
TaskDeleteOrphanedBlobsQuick = "quick-delete-blobs"
TaskDeleteOrphanedBlobsFull = "full-delete-blobs"
TaskRewriteContentsQuick = "quick-rewrite-contents"
TaskRewriteContentsFull = "full-rewrite-contents"
TaskDropDeletedContentsFull = "full-drop-deleted-content"
TaskIndexCompaction = "index-compaction"
TaskExtendBlobRetentionTimeFull = "extend-blob-retention-time"
TaskCleanupLogs = "cleanup-logs"
TaskCleanupEpochManager = "cleanup-epoch-manager"
TaskSnapshotGarbageCollection = "snapshot-gc"
TaskDeleteOrphanedBlobsQuick = "quick-delete-blobs"
TaskDeleteOrphanedBlobsFull = "full-delete-blobs"
TaskRewriteContentsQuick = "quick-rewrite-contents"
TaskRewriteContentsFull = "full-rewrite-contents"
TaskDropDeletedContentsFull = "full-drop-deleted-content"
TaskIndexCompaction = "index-compaction"
TaskExtendBlobRetentionTimeFull = "extend-blob-retention-time"
TaskCleanupLogs = "cleanup-logs"
TaskEpochAdvance = "advance-epoch"
TaskEpochDeleteSupersededIndexes = "delete-superseded-epoch-indexes"
TaskEpochCleanupMarkers = "cleanup-epoch-markers"
TaskEpochGenerateRange = "generate-epoch-range-index"
TaskEpochCompactSingle = "compact-single-epoch"
)

// shouldRun returns Mode if repository is due for periodic maintenance.
Expand Down Expand Up @@ -294,6 +299,10 @@ func runQuickMaintenance(ctx context.Context, runParams RunParameters, safety Sa
notDeletingOrphanedBlobs(ctx, s, safety)
}

if err := runTaskEpochMaintenanceQuick(ctx, runParams, s); err != nil {
return errors.Wrap(err, "error running quick epoch maintenance tasks")
}

// consolidate many smaller indexes into fewer larger ones.
if err := runTaskIndexCompactionQuick(ctx, runParams, s, safety); err != nil {
return errors.Wrap(err, "error performing index compaction")
Expand Down Expand Up @@ -327,7 +336,14 @@ func runTaskCleanupLogs(ctx context.Context, runParams RunParameters, s *Schedul
})
}

func runTaskCleanupEpochManager(ctx context.Context, runParams RunParameters, s *Schedule) error {
func runTaskEpochAdvance(ctx context.Context, em *epoch.Manager, runParams RunParameters, s *Schedule) error {
return ReportRun(ctx, runParams.rep, TaskEpochAdvance, s, func() error {
log(ctx).Infof("Cleaning up no-longer-needed epoch markers...")
return errors.Wrap(em.MaybeAdvanceWriteEpoch(ctx), "error advancing epoch marker")
})
}

func runTaskEpochMaintenanceQuick(ctx context.Context, runParams RunParameters, s *Schedule) error {
em, hasEpochManager, emerr := runParams.rep.ContentManager().EpochManager(ctx)
if emerr != nil {
return errors.Wrap(emerr, "epoch manager")
Expand All @@ -337,9 +353,61 @@ func runTaskCleanupEpochManager(ctx context.Context, runParams RunParameters, s
return nil
}

return ReportRun(ctx, runParams.rep, TaskCleanupEpochManager, s, func() error {
err := ReportRun(ctx, runParams.rep, TaskEpochCompactSingle, s, func() error {
log(ctx).Infof("Compacting an eligible uncompacted epoch...")
return errors.Wrap(em.MaybeCompactSingleEpoch(ctx), "error compacting single epoch")
})
if err != nil {
return err
}

return runTaskEpochAdvance(ctx, em, runParams, s)
}

func runTaskEpochMaintenanceFull(ctx context.Context, runParams RunParameters, s *Schedule) error {
em, hasEpochManager, emerr := runParams.rep.ContentManager().EpochManager(ctx)
if emerr != nil {
return errors.Wrap(emerr, "epoch manager")
}

if !hasEpochManager {
return nil
}

// compact a single epoch
if err := ReportRun(ctx, runParams.rep, TaskEpochCompactSingle, s, func() error {
log(ctx).Infof("Compacting an eligible uncompacted epoch...")
return errors.Wrap(em.MaybeCompactSingleEpoch(ctx), "error compacting single epoch")
}); err != nil {
return err
}

if err := runTaskEpochAdvance(ctx, em, runParams, s); err != nil {
return err
}

// compact range
if err := ReportRun(ctx, runParams.rep, TaskEpochGenerateRange, s, func() error {
log(ctx).Infof("Attempting to compact a range of epoch indexes ...")

return errors.Wrap(em.MaybeGenerateRangeCheckpoint(ctx), "error creating epoch range indexes")
}); err != nil {
return err
}

// clean up epoch markers
err := ReportRun(ctx, runParams.rep, TaskEpochCleanupMarkers, s, func() error {
log(ctx).Infof("Cleaning up unneeded epoch markers...")

return errors.Wrap(em.CleanupMarkers(ctx), "error removing epoch markers")
})
if err != nil {
return err
}

return ReportRun(ctx, runParams.rep, TaskEpochDeleteSupersededIndexes, s, func() error {
log(ctx).Infof("Cleaning up old index blobs which have already been compacted...")
return errors.Wrap(em.CleanupSupersededIndexes(ctx), "error cleaning up superseded index blobs")
return errors.Wrap(em.CleanupSupersededIndexes(ctx), "error removing superseded epoch index blobs")
})
}

Expand Down Expand Up @@ -451,7 +519,7 @@ func runFullMaintenance(ctx context.Context, runParams RunParameters, safety Saf
log(ctx).Debug("Extending object lock retention-period is disabled.")
}

if err := runTaskCleanupEpochManager(ctx, runParams, s); err != nil {
if err := runTaskEpochMaintenanceFull(ctx, runParams, s); err != nil {
return errors.Wrap(err, "error cleaning up epoch manager")
}

Expand Down
Loading