forked from minio/minio
/
daily-sweeper.go
145 lines (124 loc) · 3.66 KB
/
daily-sweeper.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
/*
* MinIO Cloud Storage, (C) 2019 MinIO, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cmd
import (
"context"
"sync"
"time"
"github.com/minio/minio/cmd/logger"
)
// The list of modules listening for the daily listing of all objects
// such as the daily heal ops, disk usage and bucket lifecycle management.
var globalDailySweepListeners = make([]chan string, 0)
var globalDailySweepListenersMu = sync.Mutex{}
// Add a new listener to the daily objects listing
func registerDailySweepListener(ch chan string) {
globalDailySweepListenersMu.Lock()
defer globalDailySweepListenersMu.Unlock()
globalDailySweepListeners = append(globalDailySweepListeners, ch)
}
// Safe copy of globalDailySweepListeners content
func copyDailySweepListeners() []chan string {
globalDailySweepListenersMu.Lock()
defer globalDailySweepListenersMu.Unlock()
var listenersCopy = make([]chan string, len(globalDailySweepListeners))
copy(listenersCopy, globalDailySweepListeners)
return listenersCopy
}
// sweepRound will list all objects, having read quorum or not and
// feeds to all listeners, such as the background healing
func sweepRound(ctx context.Context, objAPI ObjectLayer) error {
zeroDuration := time.Millisecond
zeroDynamicTimeout := newDynamicTimeout(zeroDuration, zeroDuration)
// General lock so we avoid parallel daily sweep by different instances.
sweepLock := globalNSMutex.NewNSLock(ctx, "system", "daily-sweep")
if err := sweepLock.GetLock(zeroDynamicTimeout); err != nil {
return err
}
defer sweepLock.Unlock()
buckets, err := objAPI.ListBuckets(ctx)
if err != nil {
return err
}
// List all objects, having read quorum or not in all buckets
// and send them to all the registered sweep listeners
for _, bucket := range buckets {
// Send bucket names to all listeners
for _, l := range copyDailySweepListeners() {
l <- bucket.Name
}
marker := ""
for {
res, err := objAPI.ListObjectsHeal(ctx, bucket.Name, "", marker, "", 1000)
if err != nil {
continue
}
for _, obj := range res.Objects {
for _, l := range copyDailySweepListeners() {
l <- pathJoin(bucket.Name, obj.Name)
}
}
if !res.IsTruncated {
break
} else {
marker = res.NextMarker
}
}
}
return nil
}
// initDailySweeper creates a go-routine which will list all
// objects in all buckets in a daily basis
func initDailySweeper() {
go dailySweeper()
}
// List all objects in all buckets in a daily basis
func dailySweeper() {
var lastSweepTime time.Time
var objAPI ObjectLayer
var ctx = context.Background()
// Wait until the object layer is ready
for {
objAPI = newObjectLayerFn()
if objAPI == nil {
time.Sleep(time.Second)
continue
}
break
}
// Perform a sweep round each month
for {
if time.Since(lastSweepTime) < 30*24*time.Hour {
time.Sleep(time.Hour)
continue
}
err := sweepRound(ctx, objAPI)
if err != nil {
switch err.(type) {
// Unable to hold a lock means there is another
// instance doing the sweep round
case OperationTimedOut:
lastSweepTime = time.Now()
default:
logger.LogIf(ctx, err)
time.Sleep(time.Minute)
continue
}
} else {
lastSweepTime = time.Now()
}
}
}