Skip to content

Commit

Permalink
Don't replace locks and stuff
Browse files Browse the repository at this point in the history
  • Loading branch information
klauspost committed Mar 25, 2020
1 parent ad766f7 commit 0827e22
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 26 deletions.
55 changes: 29 additions & 26 deletions cmd/data-usage-tracker.go → cmd/data-update-tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,21 +51,21 @@ const (

var (
ObjUpdatedCh chan<- string
internalDataUsageTracker *dataUsageTracker
internalDataUsageTracker *dataUpdateTracker
)

type dataUsageTracker struct {
type dataUpdateTracker struct {
mu sync.Mutex
input chan string

Current dataUsageFilter
History dataUsageTrackerHistory
Current dataUpdateFilter
History dataUpdateTrackerHistory
Saved time.Time
}

type dataUsageTrackerHistory []dataUsageFilter
type dataUpdateTrackerHistory []dataUpdateFilter

type dataUsageFilter struct {
type dataUpdateFilter struct {
idx uint64
bf bloomFilter
}
Expand Down Expand Up @@ -102,9 +102,9 @@ func (b bloomFilter) containsDir(in string) bool {
return b.Test(tmp[:])
}

// sort the dataUsageTrackerHistory, newest first.
// sort the dataUpdateTrackerHistory, newest first.
// Returns whether the history is complete.
func (d dataUsageTrackerHistory) sort() bool {
func (d dataUpdateTrackerHistory) sort() bool {
if len(d) == 0 {
return true
}
Expand All @@ -114,7 +114,7 @@ func (d dataUsageTrackerHistory) sort() bool {
return d[0].idx-d[len(d)-1].idx == uint64(len(d))
}

func (d dataUsageTrackerHistory) all() bool {
func (d dataUpdateTrackerHistory) all() bool {
if len(d) == 0 {
return true
}
Expand All @@ -125,7 +125,7 @@ func (d dataUsageTrackerHistory) all() bool {
}

// removeOlderThan will remove entries older than index 'n'.
func (d *dataUsageTrackerHistory) removeOlderThan(n uint64) {
func (d *dataUpdateTrackerHistory) removeOlderThan(n uint64) {
d.sort()
dd := *d
end := len(dd)
Expand All @@ -139,8 +139,8 @@ func (d *dataUsageTrackerHistory) removeOlderThan(n uint64) {
}

func initDataUsageTracker() {
internalDataUsageTracker = &dataUsageTracker{
Current: dataUsageFilter{
internalDataUsageTracker = &dataUpdateTracker{
Current: dataUpdateFilter{
idx: 1,
},

Expand All @@ -150,14 +150,14 @@ func initDataUsageTracker() {
ObjUpdatedCh = internalDataUsageTracker.input
}

func (d *dataUsageTracker) newBloomFilter() bloomFilter {
func (d *dataUpdateTracker) newBloomFilter() bloomFilter {
return bloomFilter{bloom.NewWithEstimates(dataUsageTrackerEstItems, dataUsageTrackerFP)}
}

// start will load the current data from the drives start collecting information and
// start a saver goroutine.
// All of these will exit when the context is cancelled.
func (d *dataUsageTracker) start(ctx context.Context, drives []string) {
func (d *dataUpdateTracker) start(ctx context.Context, drives []string) {
d.load(ctx, drives)
go d.startCollector(ctx)
go d.startSaver(ctx, dataUsageTrackerSaveInterval, drives)
Expand All @@ -168,7 +168,7 @@ func (d *dataUsageTracker) start(ctx context.Context, drives []string) {
// The newest working cache will be kept in d.
// If no valid data usage tracker can be found d will remain unchanged.
// If object is shared the caller should lock it.
func (d *dataUsageTracker) load(ctx context.Context, drives []string) {
func (d *dataUpdateTracker) load(ctx context.Context, drives []string) {
for _, drive := range drives {
func(drive string) {
cacheFormatPath := pathJoin(drive, minioMetaBucket, dataUsageTrackerFilename)
Expand All @@ -193,7 +193,7 @@ func (d *dataUsageTracker) load(ctx context.Context, drives []string) {

// startSaver will start a saver that will write d to all supplied drives at specific intervals.
// The saver will save and exit when supplied context is closed.
func (d *dataUsageTracker) startSaver(ctx context.Context, interval time.Duration, drives []string) {
func (d *dataUpdateTracker) startSaver(ctx context.Context, interval time.Duration, drives []string) {
t := time.NewTicker(interval)
var buf bytes.Buffer
for {
Expand Down Expand Up @@ -234,7 +234,7 @@ func (d *dataUsageTracker) startSaver(ctx context.Context, interval time.Duratio
// serialize all data in d to dst.
// Caller should hold lock if d is expected to be shared.
// If an error is returned, there will likely be partial data written to dst.
func (d *dataUsageTracker) serialize(dst io.Writer) (err error) {
func (d *dataUpdateTracker) serialize(dst io.Writer) (err error) {
var tmp [8]byte
o := bufio.NewWriter(dst)
defer func() {
Expand Down Expand Up @@ -279,8 +279,8 @@ func (d *dataUsageTracker) serialize(dst io.Writer) (err error) {
return nil
}

func (d *dataUsageTracker) deserialize(src io.Reader, newerThan time.Time) error {
var dst dataUsageTracker
func (d *dataUpdateTracker) deserialize(src io.Reader, newerThan time.Time) error {
var dst dataUpdateTracker
var tmp [8]byte

// Version
Expand All @@ -290,7 +290,7 @@ func (d *dataUsageTracker) deserialize(src io.Reader, newerThan time.Time) error
switch tmp[0] {
case dataUsageTrackerVersion:
default:
return errors.New("dataUsageTracker: Unknown data version")
return errors.New("dataUpdateTracker: Unknown data version")
}
// Timestamp.
if _, err := io.ReadFull(src, tmp[:8]); err != nil {
Expand All @@ -316,7 +316,7 @@ func (d *dataUsageTracker) deserialize(src io.Reader, newerThan time.Time) error
return err
}
n := binary.LittleEndian.Uint64(tmp[:])
dst.History = make(dataUsageTrackerHistory, int(n))
dst.History = make(dataUpdateTrackerHistory, int(n))
for i, e := range dst.History {
if _, err := io.ReadFull(src, tmp[:8]); err != nil {
return err
Expand All @@ -329,13 +329,16 @@ func (d *dataUsageTracker) deserialize(src io.Reader, newerThan time.Time) error
dst.History[i] = e
}
// Ignore what remains on the stream.
*d = dst
// Update d:
d.Current = dst.Current
d.History = dst.History
d.Saved = dst.Saved
return nil
}

// start a collector that picks up entries from ObjUpdatedCh
// and adds them to the current bloom filter.
func (d *dataUsageTracker) startCollector(ctx context.Context) {
func (d *dataUpdateTracker) startCollector(ctx context.Context) {
var tmp [dataUsageHashLen]byte
for {
select {
Expand Down Expand Up @@ -389,7 +392,7 @@ func (d *dataUsageTracker) startCollector(ctx context.Context) {
}
}

func (d dataUsageTrackerHistory) find(idx uint64) *dataUsageFilter {
func (d dataUpdateTrackerHistory) find(idx uint64) *dataUpdateFilter {
for _, f := range d {
if f.idx == idx {
return &f
Expand All @@ -399,7 +402,7 @@ func (d dataUsageTrackerHistory) find(idx uint64) *dataUsageFilter {
}

// filterFrom will return a combined bloom filter.
func (d *dataUsageTracker) filterFrom(ctx context.Context, oldest, newest uint64) *bloomFilterResponse {
func (d *dataUpdateTracker) filterFrom(ctx context.Context, oldest, newest uint64) *bloomFilterResponse {
bf := d.newBloomFilter()
bfr := bloomFilterResponse{
OldestIdx: oldest,
Expand Down Expand Up @@ -446,7 +449,7 @@ func (d *dataUsageTracker) filterFrom(ctx context.Context, oldest, newest uint64
// The response will contain a bloom filter starting at index x up to, but not including index y.
// If y is 0, the response will not update y, but return the currently recorded information
// from the up until and including current y.
func (d *dataUsageTracker) cycleFilter(ctx context.Context, oldest, current uint64) (*bloomFilterResponse, error) {
func (d *dataUpdateTracker) cycleFilter(ctx context.Context, oldest, current uint64) (*bloomFilterResponse, error) {
d.mu.Lock()
defer d.mu.Unlock()

Expand Down
File renamed without changes.

0 comments on commit 0827e22

Please sign in to comment.