forked from saiya/dsps
/
client.go
119 lines (97 loc) · 3.14 KB
/
client.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
package outgoing
import (
"context"
"fmt"
"net/http"
"strings"
"sync/atomic"
"time"
"golang.org/x/xerrors"
"github.com/m3dev/dsps/server/domain"
"github.com/m3dev/dsps/server/logger"
"github.com/m3dev/dsps/server/sentry"
"github.com/m3dev/dsps/server/telemetry"
)
// Client is an outgoing-webhook client.
type Client interface {
Send(ctx context.Context, msg domain.Message) error
// Shutdown this client.
// This method wait until all in-flight request ends.
Close(ctx context.Context)
String() string
}
type clientImpl struct {
_isClosed int32 // 0: available, 1: closing
method string
url string
headers map[string]string
timeout time.Duration
retry retry
h *http.Client // Note that Client does not own this object, ClientTemplate owns.
telemetry *telemetry.Telemetry
sentry sentry.Sentry
}
func newClientImpl(tpl *clientTemplate, tplEnv domain.TemplateStringEnv) (Client, error) {
c := &clientImpl{
_isClosed: 0,
method: tpl.Method,
headers: make(map[string]string, len(tpl.Headers)),
timeout: tpl.Timeout.Duration,
retry: newRetry(&tpl.Retry),
h: tpl.h,
telemetry: tpl.telemetry,
sentry: tpl.sentry,
}
var err error
c.url, err = tpl.URL.Execute(tplEnv)
if err != nil {
return nil, xerrors.Errorf(`failed to expand template of webhook URL "%s": %w`, tpl.URL, err)
}
for name, valueTpl := range tpl.Headers {
c.headers[name], err = valueTpl.Execute(tplEnv)
if err != nil {
return nil, xerrors.Errorf(`failed to expand template of webhook header "%s", "%s": %w`, name, valueTpl, err)
}
}
return c, nil
}
func (c *clientImpl) String() string {
return fmt.Sprintf("%s %s", c.method, c.url)
}
func (c *clientImpl) Send(ctx context.Context, msg domain.Message) error {
if c.isClosed() {
return xerrors.Errorf("outgoing-webhook client already closed")
}
logger.Of(ctx).Debugf(logger.CatOutgoingWebhook, "sending outgoing webhook (channel: %s, messageID: %s) to %s", msg.ChannelID, msg.MessageID, c.url)
body, err := encodeWebhookBody(ctx, msg)
if err != nil {
return xerrors.Errorf("failed to generate outgoing webhook body: %w", err)
}
return c.retry.Do(ctx, c.sentry, fmt.Sprintf("outgoing-webhook to %s", c.url), func() (*http.Request, *http.Response, error) {
req, err := http.NewRequestWithContext(ctx, c.method, c.url, strings.NewReader(body))
if err != nil {
return req, nil, err
}
req.Header.Set("Content-Type", "application/json")
for name, value := range c.headers {
// Should overwrite default headers, thus use Set() rather than Add()
req.Header.Set(name, value)
}
ctx, end := c.telemetry.StartHTTPSpan(ctx, false, req)
defer end()
res, err := c.h.Do(req)
if res != nil {
logger.Of(ctx).Debugf(logger.CatOutgoingWebhook, "received outgoing webhook response (%s %d, contentLength: %d)", res.Proto, res.StatusCode, res.ContentLength)
c.telemetry.SetHTTPResponseAttributes(ctx, res.StatusCode, res.ContentLength)
}
return req, res, err
})
}
func (c *clientImpl) Close(ctx context.Context) {
if atomic.CompareAndSwapInt32(&c._isClosed, 0, 1) {
return
}
}
func (c *clientImpl) isClosed() bool {
return atomic.LoadInt32(&c._isClosed) != 0
}