-
Notifications
You must be signed in to change notification settings - Fork 25
/
invoker.go
101 lines (81 loc) · 2.12 KB
/
invoker.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
package types
import (
"bytes"
"fmt"
"io"
"io/ioutil"
"log"
"net/http"
"github.com/pkg/errors"
)
type Invoker struct {
PrintResponse bool
Client *http.Client
GatewayURL string
Responses chan InvokerResponse
}
type InvokerResponse struct {
Body *[]byte
Header *http.Header
Status int
Error error
Topic string
Function string
}
func NewInvoker(gatewayURL string, client *http.Client, printResponse bool) *Invoker {
return &Invoker{
PrintResponse: printResponse,
Client: client,
GatewayURL: gatewayURL,
Responses: make(chan InvokerResponse),
}
}
// Invoke triggers a function by accessing the API Gateway
func (i *Invoker) Invoke(topicMap *TopicMap, topic string, message *[]byte) {
if len(*message) == 0 {
i.Responses <- InvokerResponse{
Error: fmt.Errorf("no message to send"),
}
}
matchedFunctions := topicMap.Match(topic)
for _, matchedFunction := range matchedFunctions {
log.Printf("Invoke function: %s", matchedFunction)
gwURL := fmt.Sprintf("%s/function/%s", i.GatewayURL, matchedFunction)
reader := bytes.NewReader(*message)
body, statusCode, header, doErr := invokefunction(i.Client, gwURL, reader)
if doErr != nil {
i.Responses <- InvokerResponse{
Error: errors.Wrap(doErr, fmt.Sprintf("unable to invoke %s", matchedFunction)),
}
continue
}
i.Responses <- InvokerResponse{
Body: body,
Status: statusCode,
Header: header,
Function: matchedFunction,
Topic: topic,
}
}
}
func invokefunction(c *http.Client, gwURL string, reader io.Reader) (*[]byte, int, *http.Header, error) {
httpReq, _ := http.NewRequest(http.MethodPost, gwURL, reader)
if httpReq.Body != nil {
defer httpReq.Body.Close()
}
var body *[]byte
res, doErr := c.Do(httpReq)
if doErr != nil {
return nil, http.StatusServiceUnavailable, nil, doErr
}
if res.Body != nil {
defer res.Body.Close()
bytesOut, readErr := ioutil.ReadAll(res.Body)
if readErr != nil {
log.Printf("Error reading body")
return nil, http.StatusServiceUnavailable, nil, doErr
}
body = &bytesOut
}
return body, res.StatusCode, &res.Header, doErr
}