-
Notifications
You must be signed in to change notification settings - Fork 351
/
sender.go
118 lines (104 loc) · 3.16 KB
/
sender.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
package stats
import (
"bytes"
"context"
"encoding/json"
"errors"
"fmt"
"net/http"
"time"
"github.com/davecgh/go-spew/spew"
"github.com/treeverse/lakefs/pkg/logging"
)
var (
ErrSendError = errors.New("stats: send error")
ErrNoInstallationID = fmt.Errorf("installation ID is missing: %w", ErrSendError)
)
type Sender interface {
SendEvent(ctx context.Context, installationID, processID string, m []Metric) error
UpdateMetadata(ctx context.Context, m Metadata) error
}
type TimeFn func() time.Time
type HTTPSender struct {
timeFunc TimeFn
addr string
}
func NewHTTPSender(addr string, timeFunc TimeFn) *HTTPSender {
return &HTTPSender{
timeFunc: timeFunc,
addr: addr,
}
}
func (s *HTTPSender) UpdateMetadata(ctx context.Context, m Metadata) error {
if len(m.InstallationID) == 0 {
return ErrNoInstallationID
}
serialized, err := json.MarshalIndent(m, "", " ")
if err != nil {
return fmt.Errorf("failed to serialize account metadata: %w", err)
}
req, err := http.NewRequest(http.MethodPost, s.addr+"/installation", bytes.NewBuffer(serialized))
if err != nil {
return fmt.Errorf("could not create HTTP request: %s: %w", err, ErrSendError)
}
req = req.WithContext(ctx)
res, err := http.DefaultClient.Do(req)
if err != nil {
return fmt.Errorf("could not make HTTP request: %s: %w", err, ErrSendError)
}
_ = res.Body.Close()
if res.StatusCode != http.StatusCreated {
return fmt.Errorf("bad status code received. status=%d: %w", res.StatusCode, ErrSendError)
}
return nil
}
func (s *HTTPSender) SendEvent(ctx context.Context, installationID, processID string, metrics []Metric) error {
event := &InputEvent{
InstallationID: installationID,
ProcessID: processID,
Time: s.timeFunc().Format(time.RFC3339),
Metrics: metrics,
}
serialized, err := json.MarshalIndent(event, "", " ")
if err != nil {
return fmt.Errorf("could not serialize event: %s: %w", err, ErrSendError)
}
req, err := http.NewRequest(http.MethodPost, s.addr+"/events", bytes.NewBuffer(serialized))
if err != nil {
return fmt.Errorf("could not create HTTP request: %s: %w", err, ErrSendError)
}
req = req.WithContext(ctx)
res, err := http.DefaultClient.Do(req)
if err != nil {
return fmt.Errorf("could not make HTTP request: %s: %w", err, ErrSendError)
}
defer func() {
_ = res.Body.Close()
}()
if res.StatusCode != http.StatusCreated {
return fmt.Errorf("bad status code received. status=%d: %w", res.StatusCode, ErrSendError)
}
return nil
}
type dummySender struct{}
func (s *dummySender) SendEvent(_ context.Context, installationID, processID string, metrics []Metric) error {
if logging.Default().IsTracing() {
logging.Default().WithFields(logging.Fields{
"installation_id": installationID,
"process_id": processID,
"metrics": spew.Sdump(metrics),
}).Trace("dummy sender received metrics")
}
return nil
}
func (s *dummySender) UpdateMetadata(ctx context.Context, m Metadata) error {
if logging.Default().IsTracing() {
logging.Default().WithFields(logging.Fields{
"metadata": spew.Sdump(m),
}).Trace("dummy sender received metadata")
}
return nil
}
func NewDummySender() Sender {
return &dummySender{}
}