/
request.go
88 lines (77 loc) · 2.53 KB
/
request.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
package binance
import (
"io/ioutil"
"net/http"
"net/url"
"time"
"github.com/rs/zerolog/log"
"github.com/tidwall/gjson"
)
// http get and read body as bytes
func Get(baseURL string, params url.Values) ([]uint8, error) {
resp, err := http.Get(baseURL + params.Encode())
if err != nil {
return nil, err
}
defer resp.Body.Close()
return ioutil.ReadAll(resp.Body)
}
// http get and read body as json
func GetJSON(baseURL string, params url.Values) (*gjson.Result, error) {
body, err := Get(baseURL, params)
if err != nil {
return nil, err
}
result := gjson.ParseBytes(body)
return &result, nil
}
// Interval, Margin, Delay are in ms
// `decay` is the muliple at which the delay decreases.
type TimedFetchOptions struct {
Interval,
Margin, // starts the requests a bit earlier in case their emission schedule is shorter than advertised
Delay,
Decay int // decay factor for each round. Affects how fast it syncs to remote emission at the cost of more requests.
}
// timedFetch makes requests to API endpoint that updates every expected
// `interval`, resistant to clock drift by 1 `margin` ahead and n `delay`
// behind.
// Algo: 1m 1m 1m 1m. If lastVal != val, 4m. 15s 15s 15s 15s. If lastVal != val, 4m.59s. 1s 1s 1s 1s. If lastVal != val, 4m.59.5s
func TimedFetch(baseURL string, params url.Values, options TimedFetchOptions, callback func(*gjson.Result, error)) {
delay := options.Delay
currentDuration := 0
ticker := time.NewTicker(time.Duration(options.Delay) * time.Millisecond)
defer ticker.Stop()
var lastVal string
for ; true; <-ticker.C {
jsonData, err := GetJSON(baseURL, params)
if err != nil {
callback(nil, err)
}
result := jsonData.Array()
val := result[len(result)-1]
if val.Raw == lastVal {
if currentDuration != delay {
ticker.Stop()
ticker = time.NewTicker(time.Duration(delay) * time.Millisecond)
currentDuration = delay
}
log.Info().Str("time", time.Now().String()).Int("delay", delay)
continue
}
// data updated, so it must have updated within the last ticker shortDuration
ticker.Stop()
ticker = time.NewTicker(time.Duration(options.Interval-delay-options.Margin) * time.Millisecond) // -550 gives us 550 ms of margin of clock skew (speedup)
currentDuration = options.Interval - delay - options.Margin
log.Info().Str("time", time.Now().String()).Int("delay", options.Interval-delay-options.Margin)
delay = max(delay/options.Decay, 100) // minimum of 100ms between requests
lastVal = val.Raw
callback(&val, nil)
}
}
func max(a, b int) int {
if a > b {
return a
}
return b
}