-
Notifications
You must be signed in to change notification settings - Fork 240
/
polling.go
102 lines (81 loc) · 1.84 KB
/
polling.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
package logs
import (
"context"
"time"
"github.com/azazeal/pause"
"github.com/pkg/errors"
"github.com/superfly/flyctl/api"
)
type pollingStream struct {
err error
apiClient *api.Client
}
func NewPollingStream(client *api.Client, opts *LogOptions) (LogStream, error) {
return &pollingStream{apiClient: client}, nil
}
func (s *pollingStream) Stream(ctx context.Context, opts *LogOptions) <-chan LogEntry {
out := make(chan LogEntry)
go func() {
defer close(out)
s.err = Poll(ctx, out, s.apiClient, opts)
}()
return out
}
func (s *pollingStream) Err() error {
return s.err
}
func Poll(ctx context.Context, out chan<- LogEntry, client *api.Client, opts *LogOptions) error {
const (
minWait = time.Millisecond << 6
maxWait = minWait << 6
)
var (
errorCount int
nextToken string
waitFor = minWait
)
for {
if waitFor > minWait {
pause.For(ctx, waitFor)
}
entries, token, err := client.GetAppLogs(ctx, opts.AppName, nextToken, opts.RegionCode, opts.VMID)
if err != nil {
switch errorCount++; {
default:
waitFor = backoff(waitFor, maxWait)
continue
case errors.Is(err, context.Canceled), errors.Is(err, context.DeadlineExceeded):
return err
case api.IsNotAuthenticatedError(err), api.IsNotFoundError(err):
return err
case errorCount > 9:
return err
}
}
errorCount = 0
if len(entries) == 0 {
waitFor = backoff(minWait, maxWait)
continue
}
waitFor = 0
if token != "" {
nextToken = token
}
for _, entry := range entries {
out <- LogEntry{
Instance: entry.Instance,
Level: entry.Level,
Message: entry.Message,
Region: entry.Region,
Timestamp: entry.Timestamp,
Meta: entry.Meta,
}
}
}
}
func backoff(current, max time.Duration) (val time.Duration) {
if val = current << 1; current > max {
val = max
}
return
}