Skip to content

Commit

Permalink
fix: prevent retention service creating orphaned shard files (#24530) (
Browse files Browse the repository at this point in the history
…#24547) (#24548)

Under certain circumstances, the retention service can fail to delete shards from
the store in a timely manner. When the shard groups are pruned based on age, this
leaves orphaned shard files on the disk. The retention service will then not attempt
to remove the obsolete shard files because the meta store does not know about them.
This can cause excessive disk space usage for some users.

This corrects that by requiring shards files be deleted before they can be removed
from the meta store.

fixes: #24529
(cherry picked from commit 7bd3f89)
closes #24545

Co-authored-by: Geoffrey Wossum <gwossum@influxdata.com>
(cherry picked from commit 0dc48b1)
closes #24546
  • Loading branch information
davidby-influx committed Jan 5, 2024
1 parent c169e98 commit 09a9607
Show file tree
Hide file tree
Showing 9 changed files with 597 additions and 191 deletions.
28 changes: 15 additions & 13 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -63,16 +63,17 @@ require (
go.etcd.io/bbolt v1.3.6
go.uber.org/multierr v1.6.0
go.uber.org/zap v1.16.0
golang.org/x/crypto v0.14.0
golang.org/x/sync v0.4.0
golang.org/x/sys v0.13.0
golang.org/x/text v0.13.0
golang.org/x/crypto v0.16.0
golang.org/x/exp v0.0.0-20231214170342-aacd6d4b4611
golang.org/x/sync v0.5.0
golang.org/x/sys v0.15.0
golang.org/x/text v0.14.0
golang.org/x/time v0.0.0-20220210224613-90d013bbcef8
golang.org/x/tools v0.14.1-0.20231011210224-b9b97d982b0a
golang.org/x/tools v0.16.0
google.golang.org/protobuf v1.30.0
gopkg.in/yaml.v2 v2.4.0
gopkg.in/yaml.v3 v3.0.1
honnef.co/go/tools v0.4.0
honnef.co/go/tools v0.4.6
)

require (
Expand Down Expand Up @@ -137,7 +138,7 @@ require (
github.com/fsnotify/fsnotify v1.5.4 // indirect
github.com/gabriel-vasile/mimetype v1.4.0 // indirect
github.com/glycerine/go-unsnap-stream v0.0.0-20181221182339-f9677308dec2 // indirect
github.com/glycerine/goconvey v0.0.0-20180728074245-46e3a41ad493 // indirect
github.com/glycerine/goconvey v0.0.0-20190410193231-58a59202ab31 // indirect
github.com/go-sql-driver/mysql v1.6.0 // indirect
github.com/goccy/go-json v0.9.11 // indirect
github.com/gofrs/uuid v3.3.0+incompatible // indirect
Expand All @@ -152,9 +153,10 @@ require (
github.com/googleapis/gax-go/v2 v2.7.1 // indirect
github.com/hashicorp/errwrap v1.1.0 // indirect
github.com/hashicorp/go-cleanhttp v0.5.1 // indirect
github.com/hashicorp/go-hclog v0.12.2 // indirect
github.com/hashicorp/go-multierror v1.1.1 // indirect
github.com/hashicorp/go-retryablehttp v0.6.4 // indirect
github.com/hashicorp/go-rootcerts v1.0.0 // indirect
github.com/hashicorp/go-rootcerts v1.0.2 // indirect
github.com/hashicorp/go-sockaddr v1.0.2 // indirect
github.com/hashicorp/hcl v1.0.0 // indirect
github.com/hashicorp/vault/sdk v0.1.8 // indirect
Expand All @@ -178,10 +180,11 @@ require (
github.com/mattn/go-colorable v0.1.12 // indirect
github.com/mattn/go-ieproxy v0.0.1 // indirect
github.com/mgutz/ansi v0.0.0-20170206155736-9520e82c474b // indirect
github.com/miekg/dns v1.1.29 // indirect
github.com/minio/asm2plan9s v0.0.0-20200509001527-cdd76441f9d8 // indirect
github.com/minio/c2goasm v0.0.0-20190812172519-36a3d3bbc4f3 // indirect
github.com/mitchellh/go-homedir v1.1.0 // indirect
github.com/mitchellh/mapstructure v1.1.2 // indirect
github.com/mitchellh/mapstructure v1.2.2 // indirect
github.com/moby/patternmatcher v0.5.0 // indirect
github.com/moby/sys/sequential v0.5.0 // indirect
github.com/moby/term v0.0.0-20221128092401-c43b287e0e0f // indirect
Expand Down Expand Up @@ -217,12 +220,11 @@ require (
github.com/zeebo/xxh3 v1.0.2 // indirect
go.opencensus.io v0.24.0 // indirect
go.uber.org/atomic v1.7.0 // indirect
golang.org/x/exp v0.0.0-20220827204233-334a2380cb91 // indirect
golang.org/x/exp/typeparams v0.0.0-20221208152030-732eee02a75a // indirect
golang.org/x/mod v0.13.0 // indirect
golang.org/x/net v0.17.0 // indirect
golang.org/x/mod v0.14.0 // indirect
golang.org/x/net v0.19.0 // indirect
golang.org/x/oauth2 v0.7.0 // indirect
golang.org/x/term v0.13.0 // indirect
golang.org/x/term v0.15.0 // indirect
golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2 // indirect
gonum.org/v1/gonum v0.11.0 // indirect
google.golang.org/api v0.114.0 // indirect
Expand Down
61 changes: 35 additions & 26 deletions go.sum

Large diffs are not rendered by default.

2 changes: 2 additions & 0 deletions storage/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ type MetaClient interface {
DropDatabase(name string) error
CreateShardGroup(database, policy string, timestamp time.Time) (*meta.ShardGroupInfo, error)
Database(name string) (di *meta.DatabaseInfo)
DropShard(id uint64) error
Databases() []meta.DatabaseInfo
DeleteShardGroup(database, policy string, id uint64) error
PrecreateShardGroups(now, cutoff time.Time) error
Expand Down Expand Up @@ -142,6 +143,7 @@ func NewEngine(path string, c Config, options ...Option) *Engine {
e.retentionService = retention.NewService(c.RetentionService)
e.retentionService.TSDBStore = e.tsdbStore
e.retentionService.MetaClient = e.metaClient
e.retentionService.DropShardMetaRef = retention.OSSDropShardMetaRef(e.MetaClient())

e.precreatorService = precreator.NewService(c.PrecreatorConfig)
e.precreatorService.MetaClient = e.metaClient
Expand Down
15 changes: 1 addition & 14 deletions v1/services/meta/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -668,24 +668,11 @@ func (c *Client) TruncateShardGroups(t time.Time) error {

// PruneShardGroups remove deleted shard groups from the data store.
func (c *Client) PruneShardGroups() error {
var changed bool
expiration := time.Now().Add(ShardGroupDeletedExpiration)
c.mu.Lock()
defer c.mu.Unlock()
data := c.cacheData.Clone()
for i, d := range data.Databases {
for j, rp := range d.RetentionPolicies {
var remainingShardGroups []ShardGroupInfo
for _, sgi := range rp.ShardGroups {
if sgi.DeletedAt.IsZero() || !expiration.After(sgi.DeletedAt) {
remainingShardGroups = append(remainingShardGroups, sgi)
continue
}
changed = true
}
data.Databases[i].RetentionPolicies[j].ShardGroups = remainingShardGroups
}
}
changed := data.PruneShardGroups(expiration)
if changed {
return c.commit(data)
}
Expand Down
2 changes: 2 additions & 0 deletions v1/services/meta/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1074,7 +1074,9 @@ func TestMetaClient_PruneShardGroups(t *testing.T) {

data := c.Data()
data.Databases[1].RetentionPolicies[0].ShardGroups[0].DeletedAt = expiration
data.Databases[1].RetentionPolicies[0].ShardGroups[0].Shards = nil
data.Databases[1].RetentionPolicies[0].ShardGroups[1].DeletedAt = expiration
data.Databases[1].RetentionPolicies[0].ShardGroups[1].Shards = nil

if err := c.SetData(&data); err != nil {
t.Fatal(err)
Expand Down
27 changes: 25 additions & 2 deletions v1/services/meta/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -292,8 +292,11 @@ func (data *Data) DropShard(id uint64) {
data.Databases[dbidx].RetentionPolicies[rpidx].ShardGroups[sgidx].Shards = append(shards[:found], shards[found+1:]...)

if len(shards) == 1 {
// We just deleted the last shard in the shard group.
data.Databases[dbidx].RetentionPolicies[rpidx].ShardGroups[sgidx].DeletedAt = time.Now()
// We just deleted the last shard in the shard group, but make sure we don't overwrite the timestamp if it
// was already deleted.
if !data.Databases[dbidx].RetentionPolicies[rpidx].ShardGroups[sgidx].Deleted() {
data.Databases[dbidx].RetentionPolicies[rpidx].ShardGroups[sgidx].DeletedAt = time.Now()
}
}
return
}
Expand Down Expand Up @@ -457,6 +460,26 @@ func (data *Data) DeleteShardGroup(database, policy string, id uint64) error {
return ErrShardGroupNotFound
}

// PruneShardGroups removes any shards deleted before expiration and that have no remaining owners.
// Returns true if data is modified.
func (data *Data) PruneShardGroups(expiration time.Time) bool {
var changed bool
for i, d := range data.Databases {
for j, rp := range d.RetentionPolicies {
var remainingShardGroups []ShardGroupInfo
for _, sgi := range rp.ShardGroups {
if sgi.DeletedAt.IsZero() || !expiration.After(sgi.DeletedAt) || len(sgi.Shards) > 0 {
remainingShardGroups = append(remainingShardGroups, sgi)
continue
}
changed = true
}
data.Databases[i].RetentionPolicies[j].ShardGroups = remainingShardGroups
}
}
return changed
}

// CreateContinuousQuery adds a named continuous query to a database.
func (data *Data) CreateContinuousQuery(database, name, query string) error {
di := data.Database(database)
Expand Down
54 changes: 54 additions & 0 deletions v1/services/retention/helpers/test_helpers.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
package helpers

import (
"fmt"
"time"

"golang.org/x/exp/slices"

"github.com/influxdata/influxdb/v2/v1/services/meta"
)

// DataDeleteShardGroup deletes the shard group specified by database, policy, and id from targetData.
// It does this by setting the shard group's DeletedAt time to now. We have to reimplement DeleteShardGroup
// instead of using data's so that the DeletedAt time will be deterministic. We are also not testing
// the functionality of DeleteShardGroup. We are testing if DeleteShardGroup gets called correctly.
func DataDeleteShardGroup(targetData *meta.Data, now time.Time, database, policy string, id uint64) error {
rpi, err := targetData.RetentionPolicy(database, policy)

if err != nil {
return err
} else if rpi == nil {
return meta.ErrRetentionPolicyNotFound
}

// Find shard group by ID and set its deletion timestamp.
for i := range rpi.ShardGroups {
if rpi.ShardGroups[i].ID == id {
rpi.ShardGroups[i].DeletedAt = now
return nil
}
}

return meta.ErrShardGroupNotFound
}

// DataNukeShardGroup unconditionally removes the shard group identified by targetDB, targetRP, and targetID
// from targetData. There's no meta.Data method to directly remove a shard group, only to mark it deleted and
// then prune it. We can't use the functionality we're testing to generate the expected result.
func DataNukeShardGroup(targetData *meta.Data, targetDB, targetRP string, targetID uint64) error {
rpi, err := targetData.RetentionPolicy(targetDB, targetRP)
if err != nil {
return err
} else if rpi == nil {
return fmt.Errorf("no retention policy found for %q, %q, %d", targetDB, targetRP, targetID)
}
isTargetShardGroup := func(sgi meta.ShardGroupInfo) bool {
return sgi.ID == targetID
}
if !slices.ContainsFunc(rpi.ShardGroups, isTargetShardGroup) {
return fmt.Errorf("shard not found for %q, %q, %d", targetDB, targetRP, targetID)
}
rpi.ShardGroups = slices.DeleteFunc(rpi.ShardGroups, isTargetShardGroup)
return nil
}
Loading

0 comments on commit 09a9607

Please sign in to comment.