/
accounting.go
654 lines (584 loc) · 16.9 KB
/
accounting.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
// Package accounting providers an accounting and limiting reader
package accounting
import (
"context"
"errors"
"fmt"
"io"
"sync"
"time"
"unicode/utf8"
"github.com/rclone/rclone/fs/rc"
"github.com/rclone/rclone/fs"
"github.com/rclone/rclone/fs/asyncreader"
"github.com/rclone/rclone/fs/fserrors"
)
// ErrorMaxTransferLimitReached defines error when transfer limit is reached.
// Used for checking on exit and matching to correct exit code.
var ErrorMaxTransferLimitReached = errors.New("max transfer limit reached as set by --max-transfer")
// ErrorMaxTransferLimitReachedFatal is returned from Read when the max
// transfer limit is reached.
var ErrorMaxTransferLimitReachedFatal = fserrors.FatalError(ErrorMaxTransferLimitReached)
// ErrorMaxTransferLimitReachedGraceful is returned from operations.Copy when the max
// transfer limit is reached and a graceful stop is required.
var ErrorMaxTransferLimitReachedGraceful = fserrors.NoRetryError(ErrorMaxTransferLimitReached)
// Start sets up the accounting, in particular the bandwidth limiting
func Start(ctx context.Context) {
// Start the token bucket limiter
TokenBucket.StartTokenBucket(ctx)
// Start the bandwidth update ticker
TokenBucket.StartTokenTicker(ctx)
// Start the transactions per second limiter
StartLimitTPS(ctx)
}
// Account limits and accounts for one transfer
type Account struct {
stats *StatsInfo
// The mutex is to make sure Read() and Close() aren't called
// concurrently. Unfortunately the persistent connection loop
// in http transport calls Read() after Do() returns on
// CancelRequest so this race can happen when it apparently
// shouldn't.
mu sync.Mutex // mutex protects these values
in io.Reader
ctx context.Context // current context for transfer - may change
ci *fs.ConfigInfo
origIn io.ReadCloser
close io.Closer
size int64
name string
closed bool // set if the file is closed
exit chan struct{} // channel that will be closed when transfer is finished
withBuf bool // is using a buffered in
checking bool // set if attached transfer is checking
tokenBucket buckets // per file bandwidth limiter (may be nil)
values accountValues
}
// accountValues holds statistics for this Account
type accountValues struct {
mu sync.Mutex // Mutex for stat values.
bytes int64 // Total number of bytes read
max int64 // if >=0 the max number of bytes to transfer
start time.Time // Start time of first read
lpTime time.Time // Time of last average measurement
lpBytes int // Number of bytes read since last measurement
avg float64 // Moving average of last few measurements in Byte/s
}
const averagePeriod = 16 // period to do exponentially weighted averages over
// newAccountSizeName makes an Account reader for an io.ReadCloser of
// the given size and name
func newAccountSizeName(ctx context.Context, stats *StatsInfo, in io.ReadCloser, size int64, name string) *Account {
acc := &Account{
stats: stats,
in: in,
ctx: ctx,
ci: fs.GetConfig(ctx),
close: in,
origIn: in,
size: size,
name: name,
exit: make(chan struct{}),
values: accountValues{
avg: 0,
lpTime: time.Now(),
max: -1,
},
}
if acc.ci.CutoffMode == fs.CutoffModeHard {
acc.values.max = int64((acc.ci.MaxTransfer))
}
currLimit := acc.ci.BwLimitFile.LimitAt(time.Now())
if currLimit.Bandwidth.IsSet() {
fs.Debugf(acc.name, "Limiting file transfer to %v", currLimit.Bandwidth)
acc.tokenBucket = newTokenBucket(currLimit.Bandwidth)
}
go acc.averageLoop()
stats.inProgress.set(acc.name, acc)
return acc
}
// WithBuffer - If the file is above a certain size it adds an Async reader
func (acc *Account) WithBuffer() *Account {
// if already have a buffer then just return
if acc.withBuf {
return acc
}
acc.withBuf = true
var buffers int
if acc.size >= int64(acc.ci.BufferSize) || acc.size == -1 {
buffers = int(int64(acc.ci.BufferSize) / asyncreader.BufferSize)
} else {
buffers = int(acc.size / asyncreader.BufferSize)
}
// On big files add a buffer
if buffers > 0 {
rc, err := asyncreader.New(acc.ctx, acc.origIn, buffers)
if err != nil {
fs.Errorf(acc.name, "Failed to make buffer: %v", err)
} else {
acc.in = rc
acc.close = rc
}
}
return acc
}
// HasBuffer - returns true if this Account has an AsyncReader with a buffer
func (acc *Account) HasBuffer() bool {
acc.mu.Lock()
defer acc.mu.Unlock()
_, ok := acc.in.(*asyncreader.AsyncReader)
return ok
}
// GetReader returns the underlying io.ReadCloser under any Buffer
func (acc *Account) GetReader() io.ReadCloser {
acc.mu.Lock()
defer acc.mu.Unlock()
return acc.origIn
}
// GetAsyncReader returns the current AsyncReader or nil if Account is unbuffered
func (acc *Account) GetAsyncReader() *asyncreader.AsyncReader {
acc.mu.Lock()
defer acc.mu.Unlock()
if asyncIn, ok := acc.in.(*asyncreader.AsyncReader); ok {
return asyncIn
}
return nil
}
// StopBuffering stops the async buffer doing any more buffering
func (acc *Account) StopBuffering() {
if asyncIn, ok := acc.in.(*asyncreader.AsyncReader); ok {
asyncIn.StopBuffering()
}
}
// Abandon stops the async buffer doing any more buffering
func (acc *Account) Abandon() {
if asyncIn, ok := acc.in.(*asyncreader.AsyncReader); ok {
asyncIn.Abandon()
}
}
// UpdateReader updates the underlying io.ReadCloser stopping the
// async buffer (if any) and re-adding it
func (acc *Account) UpdateReader(ctx context.Context, in io.ReadCloser) {
acc.mu.Lock()
withBuf := acc.withBuf
if withBuf {
acc.Abandon()
acc.withBuf = false
}
acc.in = in
acc.ctx = ctx
acc.close = in
acc.origIn = in
acc.closed = false
if withBuf {
acc.WithBuffer()
}
acc.mu.Unlock()
// Reset counter to stop percentage going over 100%
acc.values.mu.Lock()
acc.values.lpBytes = 0
acc.values.bytes = 0
acc.values.mu.Unlock()
}
// averageLoop calculates averages for the stats in the background
func (acc *Account) averageLoop() {
tick := time.NewTicker(time.Second)
var period float64
defer tick.Stop()
for {
select {
case now := <-tick.C:
acc.values.mu.Lock()
// Add average of last second.
elapsed := now.Sub(acc.values.lpTime).Seconds()
avg := 0.0
if elapsed > 0 {
avg = float64(acc.values.lpBytes) / elapsed
}
// Soft start the moving average
if period < averagePeriod {
period++
}
acc.values.avg = (avg + (period-1)*acc.values.avg) / period
acc.values.lpBytes = 0
acc.values.lpTime = now
// Unlock stats
acc.values.mu.Unlock()
case <-acc.exit:
return
}
}
}
// Check the read before it has happened is valid returning the number
// of bytes remaining to read.
func (acc *Account) checkReadBefore() (bytesUntilLimit int64, err error) {
// Check to see if context is cancelled
if err = acc.ctx.Err(); err != nil {
return 0, err
}
acc.values.mu.Lock()
if acc.values.max >= 0 {
bytesUntilLimit = acc.values.max - acc.stats.GetBytes()
if bytesUntilLimit < 0 {
acc.values.mu.Unlock()
return bytesUntilLimit, ErrorMaxTransferLimitReachedFatal
}
} else {
bytesUntilLimit = 1 << 62
}
// Set start time.
if acc.values.start.IsZero() {
acc.values.start = time.Now()
}
acc.values.mu.Unlock()
return bytesUntilLimit, nil
}
// Check the read call after the read has happened
func (acc *Account) checkReadAfter(bytesUntilLimit int64, n int, err error) (outN int, outErr error) {
bytesUntilLimit -= int64(n)
if bytesUntilLimit < 0 {
// chop the overage off
n += int(bytesUntilLimit)
if n < 0 {
n = 0
}
err = ErrorMaxTransferLimitReachedFatal
}
return n, err
}
// ServerSideTransferStart should be called at the start of a server-side transfer
//
// This pretends a transfer has started
func (acc *Account) ServerSideTransferStart() {
acc.values.mu.Lock()
// Set start time.
if acc.values.start.IsZero() {
acc.values.start = time.Now()
}
acc.values.mu.Unlock()
}
// ServerSideTransferEnd accounts for a read of n bytes in a sever
// side transfer to be treated as a normal transfer.
func (acc *Account) ServerSideTransferEnd(n int64) {
// Update Stats
acc.values.mu.Lock()
acc.values.bytes += n
acc.values.mu.Unlock()
acc.stats.Bytes(n)
}
// serverSideEnd accounts for non specific server side data
func (acc *Account) serverSideEnd(n int64) {
// Account for bytes unless we are checking
if !acc.checking {
acc.stats.BytesNoNetwork(n)
}
}
// ServerSideCopyEnd accounts for a read of n bytes in a sever side copy
func (acc *Account) ServerSideCopyEnd(n int64) {
acc.stats.AddServerSideCopy(n)
acc.serverSideEnd(n)
}
// ServerSideMoveEnd accounts for a read of n bytes in a sever side move
func (acc *Account) ServerSideMoveEnd(n int64) {
acc.stats.AddServerSideMove(n)
acc.serverSideEnd(n)
}
// DryRun accounts for statistics without running the operation
func (acc *Account) DryRun(n int64) {
acc.ServerSideTransferStart()
acc.ServerSideTransferEnd(n)
}
// Account for n bytes from the current file bandwidth limit (if any)
func (acc *Account) limitPerFileBandwidth(n int) {
acc.values.mu.Lock()
tokenBucket := acc.tokenBucket[TokenBucketSlotAccounting]
acc.values.mu.Unlock()
if tokenBucket != nil {
err := tokenBucket.WaitN(context.Background(), n)
if err != nil {
fs.Errorf(nil, "Token bucket error: %v", err)
}
}
}
// Account the read and limit bandwidth
func (acc *Account) accountRead(n int) {
// Update Stats
acc.values.mu.Lock()
acc.values.lpBytes += n
acc.values.bytes += int64(n)
acc.values.mu.Unlock()
acc.stats.Bytes(int64(n))
TokenBucket.LimitBandwidth(TokenBucketSlotAccounting, n)
acc.limitPerFileBandwidth(n)
}
// read bytes from the io.Reader passed in and account them
func (acc *Account) read(in io.Reader, p []byte) (n int, err error) {
bytesUntilLimit, err := acc.checkReadBefore()
if err == nil {
n, err = in.Read(p)
acc.accountRead(n)
n, err = acc.checkReadAfter(bytesUntilLimit, n, err)
}
return n, err
}
// Read bytes from the object - see io.Reader
func (acc *Account) Read(p []byte) (n int, err error) {
acc.mu.Lock()
defer acc.mu.Unlock()
return acc.read(acc.in, p)
}
// Thin wrapper for w
type accountWriteTo struct {
w io.Writer
acc *Account
}
// Write writes len(p) bytes from p to the underlying data stream. It
// returns the number of bytes written from p (0 <= n <= len(p)) and
// any error encountered that caused the write to stop early. Write
// must return a non-nil error if it returns n < len(p). Write must
// not modify the slice data, even temporarily.
//
// Implementations must not retain p.
func (awt *accountWriteTo) Write(p []byte) (n int, err error) {
bytesUntilLimit, err := awt.acc.checkReadBefore()
if err == nil {
n, err = awt.w.Write(p)
n, err = awt.acc.checkReadAfter(bytesUntilLimit, n, err)
awt.acc.accountRead(n)
}
return n, err
}
// WriteTo writes data to w until there's no more data to write or
// when an error occurs. The return value n is the number of bytes
// written. Any error encountered during the write is also returned.
func (acc *Account) WriteTo(w io.Writer) (n int64, err error) {
acc.mu.Lock()
in := acc.in
acc.mu.Unlock()
wrappedWriter := accountWriteTo{w: w, acc: acc}
if do, ok := in.(io.WriterTo); ok {
n, err = do.WriteTo(&wrappedWriter)
} else {
n, err = io.Copy(&wrappedWriter, in)
}
return
}
// AccountRead account having read n bytes
func (acc *Account) AccountRead(n int) (err error) {
acc.mu.Lock()
defer acc.mu.Unlock()
bytesUntilLimit, err := acc.checkReadBefore()
if err == nil {
n, err = acc.checkReadAfter(bytesUntilLimit, n, err)
acc.accountRead(n)
}
return err
}
// Close the object
func (acc *Account) Close() error {
acc.mu.Lock()
defer acc.mu.Unlock()
if acc.closed {
return nil
}
acc.closed = true
if acc.close == nil {
return nil
}
return acc.close.Close()
}
// Done with accounting - must be called to free accounting goroutine
func (acc *Account) Done() {
acc.mu.Lock()
defer acc.mu.Unlock()
close(acc.exit)
acc.stats.inProgress.clear(acc.name)
}
// progress returns bytes read as well as the size.
// Size can be <= 0 if the size is unknown.
func (acc *Account) progress() (bytes, size int64) {
if acc == nil {
return 0, 0
}
acc.values.mu.Lock()
bytes, size = acc.values.bytes, acc.size
acc.values.mu.Unlock()
return bytes, size
}
// speed returns the speed of the current file transfer
// in bytes per second, as well an exponentially weighted moving average
// If no read has completed yet, 0 is returned for both values.
func (acc *Account) speed() (bps, current float64) {
if acc == nil {
return 0, 0
}
acc.values.mu.Lock()
defer acc.values.mu.Unlock()
if acc.values.bytes == 0 {
return 0, 0
}
// Calculate speed from first read.
total := float64(time.Since(acc.values.start)) / float64(time.Second)
if total > 0 {
bps = float64(acc.values.bytes) / total
} else {
bps = 0.0
}
current = acc.values.avg
return
}
// eta returns the ETA of the current operation,
// rounded to full seconds.
// If the ETA cannot be determined 'ok' returns false.
func (acc *Account) eta() (etaDuration time.Duration, ok bool) {
if acc == nil {
return 0, false
}
acc.values.mu.Lock()
defer acc.values.mu.Unlock()
return eta(acc.values.bytes, acc.size, acc.values.avg)
}
// shortenName shortens in to size runes long
// If size <= 0 then in is left untouched
func shortenName(in string, size int) string {
if size <= 0 {
return in
}
if utf8.RuneCountInString(in) <= size {
return in
}
name := []rune(in)
size-- // don't count ellipsis rune
suffixLength := size / 2
prefixLength := size - suffixLength
suffixStart := len(name) - suffixLength
name = append(append(name[:prefixLength], '…'), name[suffixStart:]...)
return string(name)
}
// String produces stats for this file
func (acc *Account) String() string {
a, b := acc.progress()
_, cur := acc.speed()
eta, etaok := acc.eta()
etas := "-"
if etaok {
if eta > 0 {
etas = fmt.Sprintf("%v", eta)
} else {
etas = "0s"
}
}
if acc.ci.DataRateUnit == "bits" {
cur = cur * 8
}
percentageDone := 0
if b > 0 {
percentageDone = int(100 * float64(a) / float64(b))
}
return fmt.Sprintf("%*s:%3d%% /%s, %s/s, %s",
acc.ci.StatsFileNameLength,
shortenName(acc.name, acc.ci.StatsFileNameLength),
percentageDone,
fs.SizeSuffix(b),
fs.SizeSuffix(cur),
etas,
)
}
// rcStats adds remote control stats for this file
func (acc *Account) rcStats(out rc.Params) {
a, b := acc.progress()
out["bytes"] = a
out["size"] = b
spd, cur := acc.speed()
out["speed"] = spd
out["speedAvg"] = cur
eta, etaOK := acc.eta()
if etaOK {
out["eta"] = eta.Seconds()
} else {
out["eta"] = nil
}
out["name"] = acc.name
percentageDone := 0
if b > 0 {
percentageDone = int(100 * float64(a) / float64(b))
}
out["percentage"] = percentageDone
out["group"] = acc.stats.group
}
// OldStream returns the top io.Reader
func (acc *Account) OldStream() io.Reader {
acc.mu.Lock()
defer acc.mu.Unlock()
return acc.in
}
// SetStream updates the top io.Reader
func (acc *Account) SetStream(in io.Reader) {
acc.mu.Lock()
acc.in = in
acc.mu.Unlock()
}
// WrapStream wraps an io Reader so it will be accounted in the same
// way as account
func (acc *Account) WrapStream(in io.Reader) io.Reader {
return &accountStream{
acc: acc,
in: in,
}
}
// accountStream accounts a single io.Reader into a parent *Account
type accountStream struct {
acc *Account
in io.Reader
}
// OldStream return the underlying stream
func (a *accountStream) OldStream() io.Reader {
return a.in
}
// SetStream set the underlying stream
func (a *accountStream) SetStream(in io.Reader) {
a.in = in
}
// WrapStream wrap in in an accounter
func (a *accountStream) WrapStream(in io.Reader) io.Reader {
return a.acc.WrapStream(in)
}
// Read bytes from the object - see io.Reader
func (a *accountStream) Read(p []byte) (n int, err error) {
return a.acc.read(a.in, p)
}
// Accounter accounts a stream allowing the accounting to be removed and re-added
type Accounter interface {
io.Reader
OldStream() io.Reader
SetStream(io.Reader)
WrapStream(io.Reader) io.Reader
}
// WrapFn wraps an io.Reader (for accounting purposes usually)
type WrapFn func(io.Reader) io.Reader
// UnWrap unwraps a reader returning unwrapped and wrap, a function to
// wrap it back up again. If `in` is an Accounter then this function
// will take the accounting unwrapped and wrap will put it back on
// again the new Reader passed in.
//
// This allows functions which wrap io.Readers to move the accounting
// to the end of the wrapped chain of readers. This is very important
// if buffering is being introduced and if the Reader might be wrapped
// again.
func UnWrap(in io.Reader) (unwrapped io.Reader, wrap WrapFn) {
acc, ok := in.(Accounter)
if !ok {
return in, func(r io.Reader) io.Reader { return r }
}
return acc.OldStream(), acc.WrapStream
}
// UnWrapAccounting unwraps a reader returning unwrapped and acc a
// pointer to the accounting.
//
// The caller is expected to manage the accounting at this point.
func UnWrapAccounting(in io.Reader) (unwrapped io.Reader, acc *Account) {
a, ok := in.(*accountStream)
if !ok {
return in, nil
}
return a.in, a.acc
}