/
progress.go
181 lines (162 loc) · 4.06 KB
/
progress.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
// Copyright 2020 PingCAP, Inc. Licensed under Apache-2.0.
package utils
import (
"context"
"encoding/json"
"io"
"sync"
"sync/atomic"
"time"
"github.com/cheggaaa/pb/v3"
"github.com/pingcap/errors"
"github.com/pingcap/log"
"go.uber.org/zap"
)
type logFunc func(msg string, fields ...zap.Field)
// ProgressPrinter prints a progress bar.
type ProgressPrinter struct {
name string
total int64
redirectLog bool
progress int64
closeMu sync.Mutex
closeCh chan struct{}
closed chan struct{}
}
// NewProgressPrinter returns a new progress printer.
func NewProgressPrinter(
name string,
total int64,
redirectLog bool,
) *ProgressPrinter {
return &ProgressPrinter{
name: name,
total: total,
redirectLog: redirectLog,
}
}
// Inc increases the current progress bar.
func (pp *ProgressPrinter) Inc() {
atomic.AddInt64(&pp.progress, 1)
}
// IncBy implements glue.Progress
func (pp *ProgressPrinter) IncBy(cnt int64) {
atomic.AddInt64(&pp.progress, cnt)
}
// GetCurrent get the current progress.
func (pp *ProgressPrinter) GetCurrent() int64 {
return atomic.LoadInt64(&pp.progress)
}
// Close closes the current progress bar.
func (pp *ProgressPrinter) Close() {
pp.closeMu.Lock()
defer pp.closeMu.Unlock()
if pp.closeCh != nil {
select {
case pp.closeCh <- struct{}{}:
default:
}
<-pp.closed
} else {
log.Warn("closing no-started progress printer")
}
}
// goPrintProgress starts a gorouinte and prints progress.
func (pp *ProgressPrinter) goPrintProgress(
ctx context.Context,
logFuncImpl logFunc,
testWriter io.Writer, // Only for tests
) {
bar := pb.New64(pp.total)
if pp.redirectLog || testWriter != nil {
tmpl := `{"P":"{{percent .}}","C":"{{counters . }}","E":"{{etime .}}","R":"{{rtime .}}","S":"{{speed .}}"}`
bar.SetTemplateString(tmpl)
bar.SetRefreshRate(2 * time.Minute)
bar.Set(pb.Static, false) // Do not update automatically
bar.Set(pb.ReturnSymbol, false) // Do not append '\r'
bar.Set(pb.Terminal, false) // Do not use terminal width
// Hack! set Color to avoid separate progress string
bar.Set(pb.Color, true)
if logFuncImpl == nil {
logFuncImpl = log.Info
}
bar.SetWriter(&wrappedWriter{name: pp.name, log: logFuncImpl})
} else {
tmpl := `{{string . "barName" | green}} {{ bar . "<" "-" (cycle . "-" "\\" "|" "/" ) "." ">"}} {{percent .}}`
bar.SetTemplateString(tmpl)
bar.Set("barName", pp.name)
}
if testWriter != nil {
bar.SetWriter(testWriter)
bar.SetRefreshRate(2 * time.Second)
}
bar.Start()
closeCh := make(chan struct{}, 1)
closed := make(chan struct{})
pp.closeMu.Lock()
pp.closeCh = closeCh
pp.closed = closed
pp.closeMu.Unlock()
go func() {
defer close(closed)
t := time.NewTicker(time.Second)
defer t.Stop()
defer bar.Finish()
for {
select {
case <-ctx.Done():
// a hacky way to adapt the old behavior:
// when canceled by the context, leave the progress unchanged.
return
case <-closeCh:
// a hacky way to adapt the old behavior:
// when canceled by Close method (the 'internal' way), push the progress to 100%.
bar.SetCurrent(pp.total)
return
case <-t.C:
}
currentProgress := atomic.LoadInt64(&pp.progress)
if currentProgress <= pp.total {
bar.SetCurrent(currentProgress)
} else {
bar.SetCurrent(pp.total)
}
}
}()
}
type wrappedWriter struct {
name string
log logFunc
}
func (ww *wrappedWriter) Write(p []byte) (int, error) {
var info struct {
P string
C string
E string
R string
S string
}
if err := json.Unmarshal(p, &info); err != nil {
return 0, errors.Trace(err)
}
ww.log("progress",
zap.String("step", ww.name),
zap.String("progress", info.P),
zap.String("count", info.C),
zap.String("speed", info.S),
zap.String("elapsed", info.E),
zap.String("remaining", info.R))
return len(p), nil
}
// StartProgress starts progress bar.
func StartProgress(
ctx context.Context,
name string,
total int64,
redirectLog bool,
log logFunc,
) *ProgressPrinter {
progress := NewProgressPrinter(name, total, redirectLog)
progress.goPrintProgress(ctx, log, nil)
return progress
}