Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

gc: clean expired trash files, delfiles and delslices #3022

Merged
merged 13 commits into from
Dec 6, 2022
158 changes: 108 additions & 50 deletions cmd/gc.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ func cmdGC() *cli.Command {
ArgsUsage: "META-URL",
Description: `
It scans all objects in data storage and slices in metadata, comparing them to see if there is any
leaked object. It can also actively trigger compaction of slices.
leaked object. It can also actively trigger compaction of slices and the cleanup of delayed deleted slices or files.
Use this command if you find that data storage takes more than expected.

Examples:
Expand All @@ -52,7 +52,7 @@ $ juicefs gc redis://localhost
# Trigger compaction of all slices
$ juicefs gc redis://localhost --compact

# Delete leaked objects
# Delete leaked objects and delayed deleted slices or files
$ juicefs gc redis://localhost --delete`,
Flags: []cli.Flag{
&cli.BoolFlag{
Expand All @@ -61,7 +61,7 @@ $ juicefs gc redis://localhost --delete`,
},
&cli.BoolFlag{
Name: "delete",
Usage: "delete leaked objects",
Usage: "delete leaked objects and delayed deleted slices or files",
},
&cli.IntFlag{
Name: "threads",
Expand Down Expand Up @@ -110,12 +110,70 @@ func gc(ctx *cli.Context) error {

// Scan all chunks first and do compaction if necessary
progress := utils.NewProgress(false, false)
if ctx.Bool("compact") {
bar := progress.AddCountBar("Compacted chunks", 0)
spin := progress.AddDoubleSpinner("Compacted slices")
// Delete pending slices while listing all slices
delete := ctx.Bool("delete")
threads := ctx.Int("threads")
compact := ctx.Bool("compact")
if (delete || compact) && threads <= 0 {
logger.Fatal("threads should be greater than 0 to delete or compact objects")
}

var wg sync.WaitGroup
var delSpin *utils.Bar
var sliceChan chan *dSlice // pending delete slices

if delete || compact {
delSpin = progress.AddCountSpinner("Deleted pending")
sliceChan = make(chan *dSlice, 10240)
m.OnMsg(meta.DeleteSlice, func(args ...interface{}) error {
return store.Remove(args[0].(uint64), int(args[1].(uint32)))
delSpin.Increment()
sliceChan <- &dSlice{args[0].(uint64), args[1].(uint32)}
return nil
})
for i := 0; i < threads; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for s := range sliceChan {
if err := store.Remove(s.id, int(s.length)); err != nil {
logger.Warnf("remove %d_%d: %s", s.id, s.length, err)
}
}
}()
}
}

c := meta.WrapContext(ctx.Context)
delayedFileSpin := progress.AddDoubleSpinner("Delfiles")
cleanedFileSpin := progress.AddDoubleSpinner("Cleaned delfiles")
edge := time.Now().Add(-time.Duration(format.TrashDays) * 24 * time.Hour)
if delete {
cleanTrashSpin := progress.AddCountSpinner("Cleaned trash")
m.CleanupTrashBefore(c, edge, cleanTrashSpin.Increment)
cleanTrashSpin.Done()
}

err = m.ScanDeletedObject(
c,
nil,
func(_ meta.Ino, size uint64, ts int64) (bool, error) {
delayedFileSpin.IncrInt64(int64(size))
if delete {
cleanedFileSpin.IncrInt64(int64(size))
return true, nil
}
return false, nil
},
)
if err != nil {
logger.Fatalf("scan deleted object: %s", err)
}
delayedFileSpin.Done()
cleanedFileSpin.Done()

if compact {
bar := progress.AddCountBar("Compacted chunks", 0)
spin := progress.AddDoubleSpinner("Compacted slices")
m.OnMsg(meta.CompactChunk, func(args ...interface{}) error {
slices := args[0].([]meta.Slice)
err := vfs.Compact(chunkConf, store, slices, args[1].(uint64))
Expand All @@ -125,15 +183,15 @@ func gc(ctx *cli.Context) error {
return err
})
if st := m.CompactAll(meta.Background, ctx.Int("threads"), bar); st == 0 {
bar.Done()
spin.Done()
if progress.Quiet {
c, b := spin.Current()
logger.Infof("Compacted %d chunks (%d slices, %d bytes).", bar.Current(), c, b)
}
} else {
logger.Errorf("compact all chunks: %s", st)
}
bar.Done()
spin.Done()
} else {
m.OnMsg(meta.CompactChunk, func(args ...interface{}) error {
return nil // ignore compaction
Expand All @@ -143,52 +201,37 @@ func gc(ctx *cli.Context) error {
// put it above delete count spinner
sliceCSpin := progress.AddCountSpinner("Listed slices")

// Delete pending slices while listing all slices
delete := ctx.Bool("delete")
threads := ctx.Int("threads")
if delete && threads <= 0 {
logger.Fatal("threads should be greater than 0 to delete objects")
}
var delSpin *utils.Bar
var sliceChan chan *dSlice // pending delete slices
var wg sync.WaitGroup
if delete {
delSpin = progress.AddCountSpinner("Deleted pending")
sliceChan = make(chan *dSlice, 10240)
m.OnMsg(meta.DeleteSlice, func(args ...interface{}) error {
delSpin.Increment()
sliceChan <- &dSlice{args[0].(uint64), args[1].(uint32)}
return nil
})
for i := 0; i < threads; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for s := range sliceChan {
if err := store.Remove(s.id, int(s.length)); err != nil {
logger.Warnf("remove %d_%d: %s", s.id, s.length, err)
}
}
}()
}
}

// List all slices in metadata engine
var c = meta.NewContext(0, 0, []uint32{0})
slices := make(map[meta.Ino][]meta.Slice)
r := m.ListSlices(c, slices, delete, sliceCSpin.Increment)
if r != 0 {
logger.Fatalf("list all slices: %s", r)
}
if delete {
close(sliceChan)
wg.Wait()
delSpin.Done()
if progress.Quiet {
logger.Infof("Deleted %d pending slices", delSpin.Current())
}

delayedSliceSpin := progress.AddDoubleSpinner("Delslices")
cleanedSliceSpin := progress.AddDoubleSpinner("Cleaned delslices")

err = m.ScanDeletedObject(
c,
func(ss []meta.Slice, ts int64) (bool, error) {
for _, s := range ss {
delayedSliceSpin.IncrInt64(int64(s.Size))
if delete && ts < edge.Unix() {
cleanedSliceSpin.IncrInt64(int64(s.Size))
}
}
if delete && ts < edge.Unix() {
return true, nil
}
return false, nil
},
nil,
)
if err != nil {
logger.Fatalf("statistic: %s", err)
}
sliceCSpin.Done()
delayedSliceSpin.Done()
cleanedSliceSpin.Done()

// Scan all objects to find leaked ones
blob = object.WithPrefix(blob, "chunks/")
Expand Down Expand Up @@ -313,16 +356,31 @@ func gc(ctx *cli.Context) error {
}
}
}
m.OnMsg(meta.DeleteSlice, func(args ...interface{}) error {
return nil
})
if sliceChan != nil {
close(sliceChan)
}
close(leakedObj)
wg.Wait()
if delete || compact {
delSpin.Done()
if progress.Quiet {
logger.Infof("Deleted %d pending slices", delSpin.Current())
}
}
sliceCSpin.Done()
progress.Done()

vc, _ := valid.Current()
cc, cb := compacted.Current()
lc, lb := leaked.Current()
sc, sb := skipped.Current()
logger.Infof("scanned %d objects, %d valid, %d compacted (%d bytes), %d leaked (%d bytes), %d skipped (%d bytes)",
bar.Current(), vc, cc, cb, lc, lb, sc, sb)
dsc, dsb := cleanedSliceSpin.Current()
fc, fb := cleanedFileSpin.Current()
logger.Infof("scanned %d objects, %d valid, %d compacted (%d bytes), %d leaked (%d bytes), %d delslices (%d bytes), %d delfiles (%d bytes), %d skipped (%d bytes)",
bar.Current(), vc, cc, cb, lc, lb, dsc, dsb, fc, fb, sc, sb)
if lc > 0 && !delete {
logger.Infof("Please add `--delete` to clean leaked objects")
}
Expand Down
16 changes: 9 additions & 7 deletions cmd/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,15 +120,17 @@ func status(ctx *cli.Context) error {
fileSpinner := progress.AddDoubleSpinner("Delayed Files")
defer fileSpinner.Done()

err = m.Statistic(
ctx.Context,
func(s meta.Slice) error {
slicesSpinner.IncrInt64(int64(s.Size))
return nil
err = m.ScanDeletedObject(
meta.WrapContext(ctx.Context),
func(ss []meta.Slice, _ int64) (bool, error) {
for _, s := range ss {
slicesSpinner.IncrInt64(int64(s.Size))
}
return false, nil
},
func(_ meta.Ino, size uint64) error {
func(_ meta.Ino, size uint64, _ int64) (bool, error) {
fileSpinner.IncrInt64(int64(size))
return nil
return false, nil
},
)
if err != nil {
Expand Down
35 changes: 23 additions & 12 deletions pkg/meta/base.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
package meta

import (
"context"
"encoding/json"
"fmt"
"path"
Expand Down Expand Up @@ -82,12 +81,15 @@ type engine interface {
doGetParents(ctx Context, inode Ino) map[Ino]int
doRepair(ctx Context, inode Ino, attr *Attr) syscall.Errno

scanDeletedSlices(ctx context.Context, visitor func(s Slice) error) error
scanDeletedFiles(ctx context.Context, visitor func(ino Ino, size uint64) error) error
scanDeletedSlices(Context, deletedSliceScan) error
scanDeletedFiles(Context, deletedFileScan) error

GetSession(sid uint64, detail bool) (*Session, error)
}

type deletedSliceScan func(ss []Slice, ts int64) (clean bool, err error)
type deletedFileScan func(ino Ino, size uint64, ts int64) (clean bool, err error)

// fsStat aligned for atomic operations
// nolint:structcheck
type fsStat struct {
Expand Down Expand Up @@ -1363,9 +1365,8 @@ func (m *baseMeta) cleanupTrash() {
}
}

func (m *baseMeta) doCleanupTrash(force bool) {
func (m *baseMeta) CleanupTrashBefore(ctx Context, edge time.Time, increProgress func()) {
SandyXSD marked this conversation as resolved.
Show resolved Hide resolved
logger.Debugf("cleanup trash: started")
ctx := Background
now := time.Now()
var st syscall.Errno
var entries []*Entry
Expand All @@ -1383,7 +1384,6 @@ func (m *baseMeta) doCleanupTrash(force bool) {
}
}()
batch := 1000000
edge := now.Add(-time.Duration(24*m.fmt.TrashDays+1) * time.Hour)
for len(entries) > 0 {
e := entries[0]
ts, err := time.Parse("2006-01-02-15", string(e.Name))
Expand All @@ -1392,7 +1392,7 @@ func (m *baseMeta) doCleanupTrash(force bool) {
entries = entries[1:]
continue
}
if ts.Before(edge) || force {
if ts.Before(edge) {
var subEntries []*Entry
if st = m.en.doReaddir(ctx, e.Inode, 0, &subEntries, batch); st != 0 {
logger.Warnf("readdir subTrash %d: %s", e.Inode, st)
Expand All @@ -1411,6 +1411,9 @@ func (m *baseMeta) doCleanupTrash(force bool) {
}
if st == 0 {
count++
if increProgress != nil {
increProgress()
}
} else {
logger.Warnf("delete from trash %s/%s: %s", e.Name, se.Name, st)
rmdir = false
Expand All @@ -1431,6 +1434,14 @@ func (m *baseMeta) doCleanupTrash(force bool) {
}
}

func (m *baseMeta) doCleanupTrash(force bool) {
edge := time.Now().Add(-time.Duration(24*m.fmt.TrashDays+1) * time.Hour)
if force {
edge = time.Now()
}
m.CleanupTrashBefore(Background, edge, nil)
}

func (m *baseMeta) cleanupDelayedSlices() {
now := time.Now()
edge := now.Unix() - int64(m.fmt.TrashDays)*24*3600
Expand All @@ -1447,16 +1458,16 @@ func (m *baseMeta) cleanupDelayedSlices() {
}
}

func (m *baseMeta) Statistic(ctx context.Context, slicesDeletedScan func(Slice) error, fileDeletedScan func(ino Ino, size uint64) error) error {
func (m *baseMeta) ScanDeletedObject(ctx Context, sliceScan deletedSliceScan, fileScan deletedFileScan) error {
eg := errgroup.Group{}
if slicesDeletedScan != nil {
if sliceScan != nil {
eg.Go(func() error {
return m.en.scanDeletedSlices(ctx, slicesDeletedScan)
return m.en.scanDeletedSlices(ctx, sliceScan)
})
}
if fileDeletedScan != nil {
if fileScan != nil {
eg.Go(func() error {
return m.en.scanDeletedFiles(ctx, fileDeletedScan)
return m.en.scanDeletedFiles(ctx, fileScan)
})
}
return eg.Wait()
Expand Down
Loading