Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/periodically scan disk #1861

Merged
merged 27 commits into from
Mar 22, 2022
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
c2baf2f
add disk_use config options
parkerduckworth Mar 16, 2022
f86f618
implement disk use scan
parkerduckworth Mar 16, 2022
c1a6697
Merge branch 'gh-1839-read-only-shards' into feature/periodically-sca…
parkerduckworth Mar 16, 2022
cfc7ad1
Merge branch 'gh-1839-read-only-shards' into feature/periodically-sca…
parkerduckworth Mar 16, 2022
0f438a0
Merge branch 'gh-1839-read-only-shards' into feature/periodically-sca…
parkerduckworth Mar 17, 2022
4a6a9fc
add back Shard.initStatus after merge
parkerduckworth Mar 17, 2022
47f5375
Merge branch 'master' into feature/periodically-scan-disk
parkerduckworth Mar 17, 2022
966dd79
go mod tidy
parkerduckworth Mar 17, 2022
804fbef
Merge branch 'master' into feature/periodically-scan-disk
parkerduckworth Mar 17, 2022
c0a411a
add back build tags that travisbot removed
parkerduckworth Mar 17, 2022
469ef68
remove TODOs
parkerduckworth Mar 17, 2022
f49acd2
make disk scan goroutine cancel-able
parkerduckworth Mar 17, 2022
52f7384
only scan disk use if user provides config to do so
parkerduckworth Mar 17, 2022
52ff788
emit disk use warnings with backoff intervals
parkerduckworth Mar 21, 2022
fe08383
resolve merge conflicts
parkerduckworth Mar 21, 2022
b157219
linter
parkerduckworth Mar 21, 2022
13fc6d8
set default disk use vals, streamline warn logs
parkerduckworth Mar 21, 2022
da0fd10
add disk use config to test repo
parkerduckworth Mar 21, 2022
257b132
increase log verbosity to debug
parkerduckworth Mar 21, 2022
077c5f5
increase log verbosity to debug
parkerduckworth Mar 21, 2022
b33717a
increase log verbosity to debug
parkerduckworth Mar 21, 2022
0589be1
increase log verbosity to debug
parkerduckworth Mar 21, 2022
fb3a6e8
increase log verbosity to debug
parkerduckworth Mar 21, 2022
30578ba
add default disk scan config to tests
parkerduckworth Mar 21, 2022
90e5ccf
change log level from fatal to error
parkerduckworth Mar 22, 2022
d9168a3
remove debug logs
parkerduckworth Mar 22, 2022
9af573a
[skip ci] remove go test verbose flag
parkerduckworth Mar 22, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
8 changes: 5 additions & 3 deletions adapters/handlers/rest/configure_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,9 +132,11 @@ func configureAPI(api *operations.WeaviateAPI) http.Handler {
// TODO: configure http transport for efficient intra-cluster comm
remoteIndexClient := clients.NewRemoteIndex(clusterHttpClient)
repo := db.New(appState.Logger, db.Config{
RootPath: appState.ServerConfig.Config.Persistence.DataPath,
QueryLimit: appState.ServerConfig.Config.QueryDefaults.Limit,
QueryMaximumResults: appState.ServerConfig.Config.QueryMaximumResults,
RootPath: appState.ServerConfig.Config.Persistence.DataPath,
QueryLimit: appState.ServerConfig.Config.QueryDefaults.Limit,
QueryMaximumResults: appState.ServerConfig.Config.QueryMaximumResults,
DiskUseWarningPercentage: appState.ServerConfig.Config.DiskUse.WarningPercentage,
DiskUseReadOnlyPercentage: appState.ServerConfig.Config.DiskUse.ReadOnlyPercentage,
}, remoteIndexClient, appState.Cluster) // TODO client
vectorMigrator = db.NewMigrator(repo, appState.Logger)
vectorRepo = repo
Expand Down
12 changes: 10 additions & 2 deletions adapters/repos/db/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,8 +144,10 @@ func (i *Index) updateVectorIndexConfig(ctx context.Context,
}

type IndexConfig struct {
RootPath string
ClassName schema.ClassName
RootPath string
ClassName schema.ClassName
DiskUseWarningPercentage uint64
DiskUseReadOnlyPercentage uint64
}

func indexID(class schema.ClassName) string {
Expand Down Expand Up @@ -908,3 +910,9 @@ func (i *Index) updateShardStatus(shardName, targetStatus string) error {

return shard.updateStatus(targetStatus)
}

func (i *Index) notifyReady() {
for _, shd := range i.Shards {
shd.notifyReady()
}
}
6 changes: 4 additions & 2 deletions adapters/repos/db/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,10 @@ func (d *DB) init(ctx context.Context) error {
}

idx, err := NewIndex(ctx, IndexConfig{
ClassName: schema.ClassName(class.Class),
RootPath: d.config.RootPath,
ClassName: schema.ClassName(class.Class),
RootPath: d.config.RootPath,
DiskUseWarningPercentage: d.config.DiskUseWarningPercentage,
DiskUseReadOnlyPercentage: d.config.DiskUseReadOnlyPercentage,
}, d.schemaGetter.ShardingState(class.Class), invertedConfig,
class.VectorIndexConfig.(schema.VectorIndexConfig),
d.schemaGetter, d, d.logger, d.nodeResolver, d.remoteClient)
Expand Down
6 changes: 4 additions & 2 deletions adapters/repos/db/migrator.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,10 @@ func (m *Migrator) AddClass(ctx context.Context, class *models.Class,
shardState *sharding.State) error {
idx, err := NewIndex(ctx,
IndexConfig{
ClassName: schema.ClassName(class.Class),
RootPath: m.db.config.RootPath,
ClassName: schema.ClassName(class.Class),
RootPath: m.db.config.RootPath,
DiskUseWarningPercentage: m.db.config.DiskUseWarningPercentage,
DiskUseReadOnlyPercentage: m.db.config.DiskUseReadOnlyPercentage,
},
shardState,
// no backward-compatibility check required, since newly added classes will
Expand Down
19 changes: 15 additions & 4 deletions adapters/repos/db/repo.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,16 @@ func (d *DB) SetSchemaGetter(sg schemaUC.SchemaGetter) {
}

func (d *DB) WaitForStartup(ctx context.Context) error {
return d.init(ctx)
err := d.init(ctx)
if err != nil {
return err
}

for _, idx := range d.indices {
idx.notifyReady()
}

return nil
}

func New(logger logrus.FieldLogger, config Config,
Expand All @@ -50,9 +59,11 @@ func New(logger logrus.FieldLogger, config Config,
}

type Config struct {
RootPath string
QueryLimit int64
QueryMaximumResults int64
RootPath string
QueryLimit int64
QueryMaximumResults int64
DiskUseWarningPercentage uint64
DiskUseReadOnlyPercentage uint64
}

// GetIndex returns the index if it exists or nil if it doesn't
Expand Down
19 changes: 16 additions & 3 deletions adapters/repos/db/shard.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ type Shard struct {
propertyIndices propertyspecific.Indices
deletedDocIDs *docid.InMemDeletedTracker
cleanupInterval time.Duration
cleanupCancel chan struct{}
cancel chan struct{}
propLengths *inverted.PropertyLengthTracker
randomSource *bufferedRandomGen
versioner *shardVersioner
Expand All @@ -72,8 +72,8 @@ func NewShard(ctx context.Context, shardName string, index *Index) (*Shard, erro
deletedDocIDs: docid.NewInMemDeletedTracker(),
cleanupInterval: time.Duration(index.invertedIndexConfig.
CleanupIntervalSeconds) * time.Second,
cleanupCancel: make(chan struct{}),
randomSource: rand,
cancel: make(chan struct{}, 1),
parkerduckworth marked this conversation as resolved.
Show resolved Hide resolved
randomSource: rand,
}

hnswUserConfig, ok := index.vectorIndexUserConfig.(hnsw.UserConfig)
Expand Down Expand Up @@ -173,6 +173,8 @@ func (s *Shard) drop(force bool) error {
return storagestate.ErrStatusReadOnly
}

s.cancel <- struct{}{}
parkerduckworth marked this conversation as resolved.
Show resolved Hide resolved

ctx, cancel := context.WithTimeout(context.TODO(), 5*time.Second)
defer cancel()

Expand Down Expand Up @@ -302,9 +304,20 @@ func (s *Shard) updateVectorIndexConfig(ctx context.Context,
}

func (s *Shard) shutdown(ctx context.Context) error {
s.cancel <- struct{}{}

if err := s.propLengths.Close(); err != nil {
return errors.Wrap(err, "close prop length tracker")
}

return s.store.Shutdown(ctx)
}

func (s *Shard) notifyReady() {
s.initStatus()
s.index.logger.
WithField("action", "startup").
Debugf("shard=%s is ready", s.name)

s.scanDiskUse()
}
95 changes: 95 additions & 0 deletions adapters/repos/db/shard_disk_use.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
package db

import (
"fmt"
"syscall"
"time"

"github.com/semi-technologies/weaviate/entities/storagestate"
)

type diskUse struct {
total uint64
avail uint64
}

func (d diskUse) percentUsed() float64 {
used := d.total - d.avail
return (float64(used) / float64(d.total)) * 100
}

func (d diskUse) String() string {
GB := 1024 * 1024 * 1024

return fmt.Sprintf("total: %.2fGB, avail: %.2fGB, used: %.2fGB",
float64(d.total)/float64(GB),
float64(d.avail)/float64(GB),
float64(d.total-d.avail)/float64(GB))
}

func (s *Shard) scanDiskUse() {
go func() {
for {
t := time.Tick(time.Second * 30)
select {
case <-s.cancel:
return
case <-t:
fs := syscall.Statfs_t{}
diskPath := s.index.Config.RootPath

err := syscall.Statfs(diskPath, &fs)
if err != nil {
s.index.logger.WithField("action", "read_disk_use").
WithField("path", diskPath).
Fatalf("failed to read disk usage: %s", err)
}

du := diskUse{
total: fs.Blocks * uint64(fs.Bsize),
avail: fs.Bavail * uint64(fs.Bsize),
}

s.diskUseWarn(du, diskPath)
s.diskUseReadonly(du, diskPath)
}
}
}()
}

// logs a warning if user-set threshold is surpassed
func (s *Shard) diskUseWarn(du diskUse, diskPath string) {
if pu := du.percentUsed(); pu > float64(s.index.Config.DiskUseWarningPercentage) {
if s.index.Config.DiskUseWarningPercentage > 0 {
s.index.logger.WithField("action", "read_disk_use").
WithField("shard", s.name).
WithField("path", diskPath).
Warnf("disk usage currently at %.2f%%", pu)

s.index.logger.WithField("action", "disk_use_stats").
WithField("shard", s.name).
WithField("path", diskPath).
Debugf("%s", du.String())
}
}
}

// sets the shard to readonly if user-set threshold is surpassed
func (s *Shard) diskUseReadonly(du diskUse, diskPath string) {
if pu := du.percentUsed(); pu > float64(s.index.Config.DiskUseReadOnlyPercentage) {
if !s.isReadOnly() && s.index.Config.DiskUseReadOnlyPercentage > 0 {
err := s.updateStatus(storagestate.StatusReadOnly.String())
if err != nil {
s.index.logger.WithField("action", "set_shard_read_only").
WithField("shard", s.name).
WithField("path", s.index.Config.RootPath).
Fatal("failed to set to READONLY")
}

s.index.logger.WithField("action", "set_shard_read_only").
WithField("shard", s.name).
WithField("path", diskPath).
Warnf("disk usage currently at %.2f%%, %s set READONLY", pu, s.name)
}
}
}
7 changes: 7 additions & 0 deletions adapters/repos/db/shard_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,13 @@ import (
"github.com/semi-technologies/weaviate/entities/storagestate"
)

func (s *Shard) initStatus() {
s.statusLock.Lock()
defer s.statusLock.Unlock()

s.status = storagestate.StatusReady
}

func (s *Shard) getStatus() storagestate.Status {
s.statusLock.Lock()
defer s.statusLock.Unlock()
Expand Down
3 changes: 3 additions & 0 deletions adapters/repos/db/shard_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@
// CONTACT: hello@semi.technology
//

//go:build integrationTest
// +build integrationTest

package db

import (
Expand Down
15 changes: 1 addition & 14 deletions go.mod
Original file line number Diff line number Diff line change
@@ -1,35 +1,25 @@
module github.com/semi-technologies/weaviate

require (
github.com/asaskevich/govalidator v0.0.0-20210307081110-f21760c49a8d // indirect
github.com/aws/aws-sdk-go v1.34.28 // indirect
github.com/bmatcuk/doublestar v1.1.3
github.com/buger/jsonparser v1.1.1
github.com/coreos/go-oidc v2.0.0+incompatible
github.com/danaugrs/go-tsne v0.0.0-20200708172100-6b7d1d577fd3
github.com/davecgh/go-spew v1.1.1
github.com/fatih/camelcase v1.0.0
github.com/globalsign/mgo v0.0.0-20181015135952-eeefdecb41b8 // indirect
github.com/go-openapi/errors v0.20.2
github.com/go-openapi/loads v0.21.1
github.com/go-openapi/runtime v0.23.2
github.com/go-openapi/spec v0.20.4
github.com/go-openapi/strfmt v0.21.2
github.com/go-openapi/swag v0.21.1
github.com/go-openapi/validate v0.21.0
github.com/go-stack/stack v1.8.1 // indirect
github.com/golang-jwt/jwt/v4 v4.0.0
github.com/google/uuid v1.2.0
github.com/graphql-go/graphql v0.7.9
github.com/hashicorp/memberlist v0.2.4
github.com/jessevdk/go-flags v1.4.0
github.com/kisielk/errcheck v1.2.0 // indirect
github.com/kr/pretty v0.3.0 // indirect
github.com/kr/pty v1.1.5 // indirect
github.com/mailru/easyjson v0.7.7 // indirect
github.com/mitchellh/mapstructure v1.4.3 // indirect
github.com/nyaruka/phonenumbers v1.0.54
github.com/pborman/uuid v1.2.0 // indirect
github.com/pkg/errors v0.9.1
github.com/pquerna/cachecontrol v0.0.0-20201205024021-ac21108117ac // indirect
github.com/rs/cors v1.5.0
Expand All @@ -39,17 +29,14 @@ require (
github.com/square/go-jose v2.3.0+incompatible
github.com/stretchr/objx v0.2.0 // indirect
github.com/stretchr/testify v1.7.0
github.com/vektah/gqlparser v1.1.2 // indirect
github.com/willf/bitset v1.1.11 // indirect
github.com/willf/bloom v2.0.3+incompatible
github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c // indirect
github.com/xdg/stringprep v0.0.0-20180714160509-73f8eece6fdc // indirect
go.etcd.io/bbolt v1.3.5
go.mongodb.org/mongo-driver v1.8.4 // indirect
golang.org/x/crypto v0.0.0-20210921155107-089bfa567519 // indirect
golang.org/x/net v0.0.0-20220127200216-cd36cc0744dd
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c
golang.org/x/sys v0.0.0-20211216021012-1d35b9e2eb4e
golang.org/x/tools v0.1.10 // indirect
gonum.org/v1/gonum v0.9.1
google.golang.org/grpc v1.24.0
gopkg.in/square/go-jose.v2 v2.5.1 // indirect
Expand Down