-
Notifications
You must be signed in to change notification settings - Fork 2k
/
runutil.go
198 lines (172 loc) · 5.81 KB
/
runutil.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
// Copyright (c) The Thanos Authors.
// Licensed under the Apache License 2.0.
// Package runutil provides helpers to advanced function scheduling control like repeat or retry.
//
// It's very often the case when you need to excutes some code every fixed intervals or have it retried automatically.
// To make it reliably with proper timeout, you need to carefully arrange some boilerplate for this.
// Below function does it for you.
//
// For repeat executes, use Repeat:
//
// err := runutil.Repeat(10*time.Second, stopc, func() error {
// // ...
// })
//
// Retry starts executing closure function f until no error is returned from f:
//
// err := runutil.Retry(10*time.Second, stopc, func() error {
// // ...
// })
//
// For logging an error on each f error, use RetryWithLog:
//
// err := runutil.RetryWithLog(logger, 10*time.Second, stopc, func() error {
// // ...
// })
//
// Another use case for runutil package is when you want to close a `Closer` interface. As we all know, we should close all implements of `Closer`, such as *os.File. Commonly we will use:
//
// defer closer.Close()
//
// The problem is that Close() usually can return important error e.g for os.File the actual file flush might happen (and fail) on `Close` method. It's important to *always* check error. Thanos provides utility functions to log every error like those, allowing to put them in convenient `defer`:
//
// defer runutil.CloseWithLogOnErr(logger, closer, "log format message")
//
// For capturing error, use CloseWithErrCapture:
//
// var err error
// defer runutil.CloseWithErrCapture(&err, closer, "log format message")
//
// // ...
//
// If Close() returns error, err will capture it and return by argument.
//
// The rununtil.Exhaust* family of functions provide the same functionality but
// they take an io.ReadCloser and they exhaust the whole reader before closing
// them. They are useful when trying to use http keep-alive connections because
// for the same connection to be re-used the whole response body needs to be
// exhausted.
package runutil
import (
"fmt"
"io"
"io/ioutil"
"os"
"path/filepath"
"time"
"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"github.com/pkg/errors"
tsdberrors "github.com/prometheus/prometheus/tsdb/errors"
)
// Repeat executes f every interval seconds until stopc is closed or f returns an error.
// It executes f once right after being called.
func Repeat(interval time.Duration, stopc <-chan struct{}, f func() error) error {
tick := time.NewTicker(interval)
defer tick.Stop()
for {
if err := f(); err != nil {
return err
}
select {
case <-stopc:
return nil
case <-tick.C:
}
}
}
// Retry executes f every interval seconds until timeout or no error is returned from f.
func Retry(interval time.Duration, stopc <-chan struct{}, f func() error) error {
return RetryWithLog(log.NewNopLogger(), interval, stopc, f)
}
// RetryWithLog executes f every interval seconds until timeout or no error is returned from f. It logs an error on each f error.
func RetryWithLog(logger log.Logger, interval time.Duration, stopc <-chan struct{}, f func() error) error {
tick := time.NewTicker(interval)
defer tick.Stop()
var err error
for {
if err = f(); err == nil {
return nil
}
level.Error(logger).Log("msg", "function failed. Retrying in next tick", "err", err)
select {
case <-stopc:
return err
case <-tick.C:
}
}
}
// CloseWithLogOnErr is making sure we log every error, even those from best effort tiny closers.
func CloseWithLogOnErr(logger log.Logger, closer io.Closer, format string, a ...interface{}) {
err := closer.Close()
if err == nil {
return
}
// Not a problem if it has been closed already.
if errors.Is(err, os.ErrClosed) {
return
}
if logger == nil {
logger = log.NewLogfmtLogger(os.Stderr)
}
level.Warn(logger).Log("msg", "detected close error", "err", errors.Wrap(err, fmt.Sprintf(format, a...)))
}
// ExhaustCloseWithLogOnErr closes the io.ReadCloser with a log message on error but exhausts the reader before.
func ExhaustCloseWithLogOnErr(logger log.Logger, r io.ReadCloser, format string, a ...interface{}) {
_, err := io.Copy(ioutil.Discard, r)
if err != nil {
level.Warn(logger).Log("msg", "failed to exhaust reader, performance may be impeded", "err", err)
}
CloseWithLogOnErr(logger, r, format, a...)
}
// CloseWithErrCapture runs function and on error return error by argument including the given error (usually
// from caller function).
func CloseWithErrCapture(err *error, closer io.Closer, format string, a ...interface{}) {
merr := tsdberrors.MultiError{}
merr.Add(*err)
merr.Add(errors.Wrapf(closer.Close(), format, a...))
*err = merr.Err()
}
// ExhaustCloseWithErrCapture closes the io.ReadCloser with error capture but exhausts the reader before.
func ExhaustCloseWithErrCapture(err *error, r io.ReadCloser, format string, a ...interface{}) {
_, copyErr := io.Copy(ioutil.Discard, r)
CloseWithErrCapture(err, r, format, a...)
// Prepend the io.Copy error.
merr := tsdberrors.MultiError{}
merr.Add(copyErr)
merr.Add(*err)
*err = merr.Err()
}
// DeleteAllExceptDirs deletes all files and directories inside the given
// dir except for the ignoreDirs directories.
func DeleteAllExceptDirs(dir string, ignoreDirs []string) error {
entries, err := ioutil.ReadDir(dir)
if err != nil {
return errors.Wrap(err, "read dir")
}
var groupErrs tsdberrors.MultiError
for _, d := range entries {
if !d.IsDir() {
if err := os.RemoveAll(filepath.Join(dir, d.Name())); err != nil {
groupErrs.Add(err)
}
continue
}
var found bool
for _, id := range ignoreDirs {
if id == d.Name() {
found = true
break
}
}
if !found {
if err := os.RemoveAll(filepath.Join(dir, d.Name())); err != nil {
groupErrs.Add(err)
}
}
}
if groupErrs != nil {
return errors.Wrap(groupErrs, "delete file/dir")
}
return nil
}