forked from luraproject/lura
/
concurrent.go
75 lines (63 loc) · 1.67 KB
/
concurrent.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
package proxy
import (
"context"
"errors"
"time"
"github.com/devopsfaith/krakend/config"
)
// NewConcurrentMiddleware creates a proxy middleware that enables sending several requests concurrently
func NewConcurrentMiddleware(remote *config.Backend) Middleware {
if remote.ConcurrentCalls == 1 {
panic(ErrTooManyProxies)
}
serviceTimeout := time.Duration(75*remote.Timeout.Nanoseconds()/100) * time.Nanosecond
return func(next ...Proxy) Proxy {
if len(next) > 1 {
panic(ErrTooManyProxies)
}
return func(ctx context.Context, request *Request) (*Response, error) {
localCtx, cancel := context.WithTimeout(ctx, serviceTimeout)
results := make(chan *Response, remote.ConcurrentCalls)
failed := make(chan error, remote.ConcurrentCalls)
for i := 0; i < remote.ConcurrentCalls; i++ {
go processConcurrentCall(localCtx, next[0], request, results, failed)
}
var response *Response
var err error
for i := 0; i < remote.ConcurrentCalls; i++ {
select {
case response = <-results:
if response != nil && response.IsComplete {
cancel()
return response, nil
}
case err = <-failed:
case <-ctx.Done():
}
}
cancel()
return response, err
}
}
}
var errNullResult = errors.New("invalid response")
func processConcurrentCall(ctx context.Context, next Proxy, request *Request, out chan<- *Response, failed chan<- error) {
localCtx, cancel := context.WithCancel(ctx)
result, err := next(localCtx, request)
if err != nil {
failed <- err
cancel()
return
}
if result == nil {
failed <- errNullResult
cancel()
return
}
select {
case out <- result:
case <-ctx.Done():
failed <- ctx.Err()
}
cancel()
}