/
ha.go
104 lines (87 loc) · 2.76 KB
/
ha.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
package rockset
import (
"context"
"sync"
"github.com/rs/zerolog"
"github.com/rs/zerolog/log"
"github.com/rockset/rockset-go-client/openapi"
"github.com/rockset/rockset-go-client/option"
)
type Querier interface {
Query(context.Context, string, ...option.QueryOption) (openapi.QueryResponse, error)
}
type HA struct {
clients []Querier
}
func NewHA(client ...Querier) *HA {
return &HA{client}
}
func (ha *HA) Query(ctx context.Context, query string, options ...option.QueryOption) (openapi.QueryResponse, []error) {
log := zerolog.Ctx(ctx)
var wg sync.WaitGroup
resultCh := make(chan openapi.QueryResponse, len(ha.clients))
errorCh := make(chan error, len(ha.clients))
wg.Add(len(ha.clients))
for idx, c := range ha.clients {
go func(i int, cl Querier) {
res, err := cl.Query(ctx, query, options...)
if err != nil {
log.Error().Err(err).Int("idx", i).Msg("failed to query")
errorCh <- err
} else {
log.Trace().Int("idx", i).Msg("got query response")
resultCh <- res
}
wg.Done()
}(idx, c) // avoid using the loop variables as they mutate each iteration
}
// This go routine waits for all parallel query go routines to complete,
// which happens once the first request is returned and the rest get cancelled,
// or when all requests have failed. It then closes all channels.
go func() {
wg.Wait()
close(resultCh)
close(errorCh)
}()
// create a sub-context, so we can cancel all HTTP requests after getting the first answer
subCtx, cancel := context.WithCancel(ctx)
// make sure to cancel any pending query result as we don't need them when we return
defer cancel()
return returnFirst(subCtx, resultCh, errorCh)
}
func returnFirst(ctx context.Context, resultCh chan openapi.QueryResponse,
errorCh chan error) (openapi.QueryResponse, []error) {
var errors []error
for {
select {
case res, ok := <-resultCh:
if !ok {
log.Warn().Msg("receive from closed results channel")
// avoid getting selected again once the channel is closed, which can happen when we just had errors
resultCh = nil
continue
}
// log any error we got BEFORE the successful response
for _, err := range errors {
log.Error().Err(err).Msg("before the response")
}
// log any error we got AFTER the successful response
for err := range errorCh {
if err != context.Canceled {
log.Error().Err(err).Msg("after the response")
}
}
return res, nil
case <-ctx.Done():
log.Error().Err(ctx.Err()).Msg("context cancelled")
errors = append(errors, ctx.Err())
return openapi.QueryResponse{}, errors
case err, ok := <-errorCh:
// if the errorCh is closed, ok will be false, and it is time to call it a day
if !ok {
return openapi.QueryResponse{}, errors
}
errors = append(errors, err)
}
}
}