Skip to content

Commit

Permalink
Merge pull request agola-io#160 from sgotti/datamanager_remove_old_da…
Browse files Browse the repository at this point in the history
…ta_files

datamanager: clean old data files
  • Loading branch information
sgotti committed Nov 7, 2019
2 parents e0f152f + 0a2971f commit b77688a
Show file tree
Hide file tree
Showing 4 changed files with 596 additions and 130 deletions.
196 changes: 181 additions & 15 deletions internal/datamanager/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,13 @@ package datamanager

import (
"bytes"
"container/ring"
"context"
"encoding/json"
"fmt"
"io"
"path"
"regexp"
"sort"
"strings"

Expand All @@ -32,6 +35,12 @@ import (

const (
DefaultMaxDataFileSize = 10 * 1024 * 1024
dataStatusToKeep = 3
)

var (
DataFileRegexp = regexp.MustCompile(`^([a-zA-Z0-9]+-[a-zA-Z0-9]+)-([a-zA-Z0-9-]+)\.(data|index)$`)
DataStatusFileRegexp = regexp.MustCompile(`^([a-zA-Z0-9]+-[a-zA-Z0-9]+)\.status$`)
)

type DataStatus struct {
Expand Down Expand Up @@ -527,33 +536,49 @@ func (d *DataManager) Read(dataType, id string) (io.Reader, error) {
return bytes.NewReader(de.Data), nil
}

func (d *DataManager) GetLastDataStatusPath() (string, error) {
func (d *DataManager) GetLastDataStatusSequences(n int) ([]*sequence.Sequence, error) {
if n < 1 {
return nil, errors.Errorf("n must be greater than 0")
}
r := ring.New(n)
re := r

doneCh := make(chan struct{})
defer close(doneCh)

var dataStatusPath string
for object := range d.ost.List(d.storageDataDir()+"/", "", false, doneCh) {
if object.Err != nil {
return "", object.Err
return nil, object.Err
}
if strings.HasSuffix(object.Path, ".status") {
dataStatusPath = object.Path
if m := DataStatusFileRegexp.FindStringSubmatch(path.Base(object.Path)); m != nil {
seq, err := sequence.Parse(m[1])
if err != nil {
d.log.Warnf("cannot parse sequence for data status file %q", object.Path)
continue
}
re.Value = seq
re = re.Next()
} else {
d.log.Warnf("bad file %q found in storage data dir", object.Path)
}
}
if dataStatusPath == "" {
return "", ostypes.ErrNotExist
}

return dataStatusPath, nil
}
dataStatusSequences := []*sequence.Sequence{}
re.Do(func(x interface{}) {
if x != nil {
dataStatusSequences = append([]*sequence.Sequence{x.(*sequence.Sequence)}, dataStatusSequences...)
}
})

func (d *DataManager) GetLastDataStatus() (*DataStatus, error) {
dataStatusPath, err := d.GetLastDataStatusPath()
if err != nil {
return nil, err
if len(dataStatusSequences) == 0 {
return nil, ostypes.ErrNotExist
}

dataStatusf, err := d.ost.ReadObject(dataStatusPath)
return dataStatusSequences, nil
}

func (d *DataManager) GetDataStatus(dataSequence *sequence.Sequence) (*DataStatus, error) {
dataStatusf, err := d.ost.ReadObject(d.dataStatusPath(dataSequence))
if err != nil {
return nil, err
}
Expand All @@ -564,6 +589,24 @@ func (d *DataManager) GetLastDataStatus() (*DataStatus, error) {
return dataStatus, dec.Decode(&dataStatus)
}

func (d *DataManager) GetLastDataStatusSequence() (*sequence.Sequence, error) {
dataStatusSequences, err := d.GetLastDataStatusSequences(1)
if err != nil {
return nil, err
}

return dataStatusSequences[0], nil
}

func (d *DataManager) GetLastDataStatus() (*DataStatus, error) {
dataStatusSequence, err := d.GetLastDataStatusSequence()
if err != nil {
return nil, err
}

return d.GetDataStatus(dataStatusSequence)
}

func (d *DataManager) Export(ctx context.Context, w io.Writer) error {
if err := d.checkpoint(ctx, true); err != nil {
return err
Expand Down Expand Up @@ -725,3 +768,126 @@ func (d *DataManager) Import(ctx context.Context, r io.Reader) error {

return nil
}

func (d *DataManager) CleanOldCheckpoints(ctx context.Context) error {
dataStatusSequences, err := d.GetLastDataStatusSequences(dataStatusToKeep)
if err != nil {
return err
}

return d.cleanOldCheckpoints(ctx, dataStatusSequences)
}

func (d *DataManager) cleanOldCheckpoints(ctx context.Context, dataStatusSequences []*sequence.Sequence) error {
if len(dataStatusSequences) == 0 {
return nil
}

lastDataStatusSequence := dataStatusSequences[0]

// Remove old data status paths
if len(dataStatusSequences) >= dataStatusToKeep {
dataStatusPathsMap := map[string]struct{}{}
for _, seq := range dataStatusSequences {
dataStatusPathsMap[d.dataStatusPath(seq)] = struct{}{}
}

doneCh := make(chan struct{})
defer close(doneCh)
for object := range d.ost.List(d.storageDataDir()+"/", "", false, doneCh) {
if object.Err != nil {
return object.Err
}

skip := false
if m := DataStatusFileRegexp.FindStringSubmatch(path.Base(object.Path)); m != nil {
seq, err := sequence.Parse(m[1])
if err == nil && seq.String() > lastDataStatusSequence.String() {
d.log.Infof("skipping file %q since its sequence is greater than %q", object.Path, lastDataStatusSequence)
skip = true
}
}
if skip {
continue
}

if _, ok := dataStatusPathsMap[object.Path]; !ok {
d.log.Infof("removing %q", object.Path)
if err := d.ost.DeleteObject(object.Path); err != nil {
if err != ostypes.ErrNotExist {
return err
}
}
}
}
}

// A list of files to keep
files := map[string]struct{}{}

for _, dataStatusSequence := range dataStatusSequences {
dataStatus, err := d.GetDataStatus(dataStatusSequence)
if err != nil {
return err
}

for dataType := range dataStatus.Files {
for _, file := range dataStatus.Files[dataType] {
files[d.DataFileBasePath(dataType, file.ID)] = struct{}{}
}
}
}

doneCh := make(chan struct{})
defer close(doneCh)

for object := range d.ost.List(d.storageDataDir()+"/", "", true, doneCh) {
if object.Err != nil {
return object.Err
}

p := object.Path
// object file relative to the storageDataDir
pr := strings.TrimPrefix(p, d.storageDataDir()+"/")
// object file full path without final extension
pne := strings.TrimSuffix(p, path.Ext(p))
// object file base name
pb := path.Base(p)

// skip status files
if !strings.Contains(pr, "/") && strings.HasSuffix(pr, ".status") {
continue
}

// skip data files with a sequence greater than the last known sequence.
// this is to avoid possible conditions where there's a Clean concurrent
// with a running Checkpoint (also if protect by etcd locks, they cannot
// enforce these kind of operations that are acting on resources
// external to etcd during network errors) that will remove the objects
// created by this checkpoint since the data status file doesn't yet
// exist.
skip := false
// extract the data sequence from the object name
if m := DataFileRegexp.FindStringSubmatch(pb); m != nil {
seq, err := sequence.Parse(m[1])
if err == nil && seq.String() > lastDataStatusSequence.String() {
d.log.Infof("skipping file %q since its sequence is greater than %q", p, lastDataStatusSequence)
skip = true
}
}
if skip {
continue
}

if _, ok := files[pne]; !ok {
d.log.Infof("removing %q", object.Path)
if err := d.ost.DeleteObject(object.Path); err != nil {
if err != ostypes.ErrNotExist {
return err
}
}
}
}

return nil
}
70 changes: 39 additions & 31 deletions internal/datamanager/datamanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,10 @@ import (
// * Etcd cluster restored to a previous revision: really bad cause should detect that the revision is smaller than the current one

const (
DefaultCheckpointInterval = 10 * time.Second
DefaultEtcdWalsKeepNum = 100
DefaultMinCheckpointWalsNum = 100
DefaultCheckpointInterval = 10 * time.Second
DefaultCheckpointCleanInterval = 5 * time.Minute
DefaultEtcdWalsKeepNum = 100
DefaultMinCheckpointWalsNum = 100
)

var (
Expand Down Expand Up @@ -79,30 +80,32 @@ const (
)

type DataManagerConfig struct {
BasePath string
E *etcd.Store
OST *objectstorage.ObjStorage
DataTypes []string
EtcdWalsKeepNum int
CheckpointInterval time.Duration
BasePath string
E *etcd.Store
OST *objectstorage.ObjStorage
DataTypes []string
EtcdWalsKeepNum int
CheckpointInterval time.Duration
CheckpointCleanInterval time.Duration
// MinCheckpointWalsNum is the minimum number of wals required before doing a checkpoint
MinCheckpointWalsNum int
MaxDataFileSize int64
MaintenanceMode bool
}

type DataManager struct {
basePath string
log *zap.SugaredLogger
e *etcd.Store
ost *objectstorage.ObjStorage
changes *WalChanges
dataTypes []string
etcdWalsKeepNum int
checkpointInterval time.Duration
minCheckpointWalsNum int
maxDataFileSize int64
maintenanceMode bool
basePath string
log *zap.SugaredLogger
e *etcd.Store
ost *objectstorage.ObjStorage
changes *WalChanges
dataTypes []string
etcdWalsKeepNum int
checkpointInterval time.Duration
checkpointCleanInterval time.Duration
minCheckpointWalsNum int
maxDataFileSize int64
maintenanceMode bool
}

func NewDataManager(ctx context.Context, logger *zap.Logger, conf *DataManagerConfig) (*DataManager, error) {
Expand All @@ -115,6 +118,9 @@ func NewDataManager(ctx context.Context, logger *zap.Logger, conf *DataManagerCo
if conf.CheckpointInterval == 0 {
conf.CheckpointInterval = DefaultCheckpointInterval
}
if conf.CheckpointCleanInterval == 0 {
conf.CheckpointCleanInterval = DefaultCheckpointCleanInterval
}
if conf.MinCheckpointWalsNum == 0 {
conf.MinCheckpointWalsNum = DefaultMinCheckpointWalsNum
}
Expand All @@ -126,17 +132,18 @@ func NewDataManager(ctx context.Context, logger *zap.Logger, conf *DataManagerCo
}

d := &DataManager{
basePath: conf.BasePath,
log: logger.Sugar(),
e: conf.E,
ost: conf.OST,
changes: NewWalChanges(conf.DataTypes),
dataTypes: conf.DataTypes,
etcdWalsKeepNum: conf.EtcdWalsKeepNum,
checkpointInterval: conf.CheckpointInterval,
minCheckpointWalsNum: conf.MinCheckpointWalsNum,
maxDataFileSize: conf.MaxDataFileSize,
maintenanceMode: conf.MaintenanceMode,
basePath: conf.BasePath,
log: logger.Sugar(),
e: conf.E,
ost: conf.OST,
changes: NewWalChanges(conf.DataTypes),
dataTypes: conf.DataTypes,
etcdWalsKeepNum: conf.EtcdWalsKeepNum,
checkpointInterval: conf.CheckpointInterval,
checkpointCleanInterval: conf.CheckpointCleanInterval,
minCheckpointWalsNum: conf.MinCheckpointWalsNum,
maxDataFileSize: conf.MaxDataFileSize,
maintenanceMode: conf.MaintenanceMode,
}

// add trailing slash the basepath
Expand Down Expand Up @@ -231,6 +238,7 @@ func (d *DataManager) Run(ctx context.Context, readyCh chan struct{}) error {
go d.watcherLoop(ctx)
go d.syncLoop(ctx)
go d.checkpointLoop(ctx)
go d.checkpointCleanLoop(ctx)
go d.walCleanerLoop(ctx)
go d.compactChangeGroupsLoop(ctx)
go d.etcdPingerLoop(ctx)
Expand Down

0 comments on commit b77688a

Please sign in to comment.