Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 17 additions & 2 deletions pkg/fileservice/disk_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ func NewDiskCache(
path string,
capacity int,
perfCounterSets []*perfcounter.CounterSet,
asyncLoad bool,
) (ret *DiskCache, err error) {

err = os.MkdirAll(path, 0755)
Expand Down Expand Up @@ -80,21 +81,29 @@ func NewDiskCache(
ret.updatingPaths.Cond = sync.NewCond(new(sync.Mutex))
ret.updatingPaths.m = make(map[string]bool)

ret.loadCache()
if asyncLoad {
go ret.loadCache()
} else {
ret.loadCache()
}

return ret, nil
}

func (d *DiskCache) loadCache() {

var numFiles, numCacheFiles int

_ = filepath.WalkDir(d.path, func(path string, entry os.DirEntry, err error) error {
numFiles++
if err != nil {
return nil //ignore
}
if entry.IsDir() {
// try remove if empty. for cleaning old structure
if path != d.path {
os.Remove(path)
// os.Remove will not delete non-empty directory
_ = os.Remove(path)
}
return nil
}
Expand All @@ -107,10 +116,16 @@ func (d *DiskCache) loadCache() {
}

d.cache.Set(path, struct{}{}, int(fileSize(info)))
numCacheFiles++

return nil
})

logutil.Info("disk cache info loaded",
zap.Any("all files", numFiles),
zap.Any("cache files", numCacheFiles),
)

}

var _ IOVectorCache = new(DiskCache)
Expand Down
15 changes: 9 additions & 6 deletions pkg/fileservice/disk_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ func TestDiskCache(t *testing.T) {
})

// new
cache, err := NewDiskCache(ctx, dir, 1<<20, nil)
cache, err := NewDiskCache(ctx, dir, 1<<20, nil, false)
assert.Nil(t, err)

// update
Expand Down Expand Up @@ -125,14 +125,14 @@ func TestDiskCache(t *testing.T) {
testRead(cache)

// new cache instance and read
cache, err = NewDiskCache(ctx, dir, 1<<20, nil)
cache, err = NewDiskCache(ctx, dir, 1<<20, nil, false)
assert.Nil(t, err)
testRead(cache)

assert.Equal(t, 1, numWritten)

// new cache instance and update
cache, err = NewDiskCache(ctx, dir, 1<<20, nil)
cache, err = NewDiskCache(ctx, dir, 1<<20, nil, false)
assert.Nil(t, err)
testUpdate(cache)

Expand All @@ -150,7 +150,7 @@ func TestDiskCacheWriteAgain(t *testing.T) {
var counterSet perfcounter.CounterSet
ctx = perfcounter.WithCounterSet(ctx, &counterSet)

cache, err := NewDiskCache(ctx, dir, 4096, nil)
cache, err := NewDiskCache(ctx, dir, 4096, nil, false)
assert.Nil(t, err)

// update
Expand Down Expand Up @@ -212,7 +212,7 @@ func TestDiskCacheWriteAgain(t *testing.T) {
func TestDiskCacheFileCache(t *testing.T) {
dir := t.TempDir()
ctx := context.Background()
cache, err := NewDiskCache(ctx, dir, 1<<20, nil)
cache, err := NewDiskCache(ctx, dir, 1<<20, nil, false)
assert.Nil(t, err)

vector := IOVector{
Expand Down Expand Up @@ -271,7 +271,7 @@ func TestDiskCacheDirSize(t *testing.T) {

dir := t.TempDir()
capacity := 1 << 20
cache, err := NewDiskCache(ctx, dir, capacity, nil)
cache, err := NewDiskCache(ctx, dir, capacity, nil, false)
assert.Nil(t, err)

data := bytes.Repeat([]byte("a"), capacity/128)
Expand Down Expand Up @@ -330,6 +330,7 @@ func benchmarkDiskCacheWriteThenRead(
dir,
10<<30,
nil,
false,
)
if err != nil {
b.Fatal(err)
Expand Down Expand Up @@ -421,6 +422,7 @@ func benchmarkDiskCacheReadRandomOffsetAtLargeFile(
dir,
8<<30,
nil,
false,
)
if err != nil {
b.Fatal(err)
Expand Down Expand Up @@ -488,6 +490,7 @@ func BenchmarkDiskCacheMultipleIOEntries(b *testing.B) {
dir,
8<<30,
nil,
false,
)
if err != nil {
b.Fatal(err)
Expand Down
1 change: 1 addition & 0 deletions pkg/fileservice/local_fs.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,7 @@ func (l *LocalFS) initCaches(ctx context.Context, config CacheConfig) error {
*config.DiskPath,
int(*config.DiskCapacity),
l.perfCounterSets,
true,
)
if err != nil {
return err
Expand Down
1 change: 1 addition & 0 deletions pkg/fileservice/s3_fs.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,7 @@ func (s *S3FS) initCaches(ctx context.Context, config CacheConfig) error {
*config.DiskPath,
int(*config.DiskCapacity),
s.perfCounterSets,
true,
)
if err != nil {
return err
Expand Down