-
Notifications
You must be signed in to change notification settings - Fork 678
/
bucket_backup.go
94 lines (84 loc) · 2.53 KB
/
bucket_backup.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
// _ _
// __ _____ __ ___ ___ __ _| |_ ___
// \ \ /\ / / _ \/ _` \ \ / / |/ _` | __/ _ \
// \ V V / __/ (_| |\ V /| | (_| | || __/
// \_/\_/ \___|\__,_| \_/ |_|\__,_|\__\___|
//
// Copyright © 2016 - 2024 Weaviate B.V. All rights reserved.
//
// CONTACT: hello@weaviate.io
//
package lsmkv
import (
"context"
"io/fs"
"path"
"path/filepath"
"github.com/pkg/errors"
"github.com/weaviate/weaviate/entities/storagestate"
)
// FlushMemtable flushes any active memtable and returns only once the memtable
// has been fully flushed and a stable state on disk has been reached.
//
// This is a preparatory stage for creating backups.
//
// Method should be run only if flushCycle is not running
// (was not started, is stopped, or noop impl is provided)
func (b *Bucket) FlushMemtable() error {
if b.isReadOnly() {
return errors.Wrap(storagestate.ErrStatusReadOnly, "flush memtable")
}
// this lock does not currently _need_ to be
// obtained, as the only other place that
// grabs this lock is the flush cycle, which
// has just been stopped above.
//
// that being said, we will lock here anyway
// as flushLock may be added elsewhere in the
// future
b.flushLock.Lock()
if b.active == nil && b.flushing == nil {
b.flushLock.Unlock()
return nil
}
b.flushLock.Unlock()
stat, err := b.active.commitlog.file.Stat()
if err != nil {
b.logger.WithField("action", "lsm_wal_stat").
WithField("path", b.dir).
WithError(err).
Fatal("bucket backup memtable flush failed")
}
// attempting a flush&switch on when the active memtable
// or WAL is empty results in a corrupted backup attempt
if b.active.Size() > 0 || stat.Size() > 0 {
if err := b.FlushAndSwitch(); err != nil {
return err
}
}
return nil
}
// ListFiles lists all files that currently exist in the Bucket. The files are only
// in a stable state if the memtable is empty, and if compactions are paused. If one
// of those conditions is not given, it errors
func (b *Bucket) ListFiles(ctx context.Context, basePath string) ([]string, error) {
var (
bucketRoot = b.disk.dir
files []string
)
err := filepath.WalkDir(bucketRoot, func(currPath string, d fs.DirEntry, err error) error {
if d.IsDir() {
return nil
}
// ignore .wal files because they are not immutable
if filepath.Ext(currPath) == ".wal" {
return nil
}
files = append(files, path.Join(basePath, path.Base(currPath)))
return nil
})
if err != nil {
return nil, errors.Errorf("failed to list files for bucket: %s", err)
}
return files, nil
}