-
Notifications
You must be signed in to change notification settings - Fork 307
/
uploader.go
187 lines (153 loc) · 5.14 KB
/
uploader.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
//go:generate mockgen -destination=../../mocks/services/debugger/uploader.go -package mock_debugger github.com/rudderlabs/rudder-server/services/debugger Transformer,UploaderI
package debugger
import (
"bytes"
"context"
"net/http"
"sync"
"time"
"github.com/rudderlabs/rudder-server/config"
"github.com/rudderlabs/rudder-server/rruntime"
"github.com/rudderlabs/rudder-server/utils/httputil"
"github.com/rudderlabs/rudder-server/utils/logger"
"github.com/rudderlabs/rudder-server/utils/sysUtils"
)
var (
pkgLogger logger.Logger
Http sysUtils.HttpI = sysUtils.NewHttp()
)
type UploaderI interface {
Start()
Stop()
RecordEvent(data interface{})
}
type Transformer interface {
Transform(data interface{}) ([]byte, error)
}
type Uploader struct {
url string
transformer Transformer
eventBatchChannel chan interface{}
eventBufferLock sync.RWMutex
eventBuffer []interface{}
Client sysUtils.HTTPClientI
maxBatchSize, maxRetry, maxESQueueSize int
batchTimeout, retrySleep time.Duration
bgWaitGroup sync.WaitGroup
}
func init() {
pkgLogger = logger.NewLogger().Child("debugger")
}
func (uploader *Uploader) Setup() {
// Number of events that are batched before sending events to control plane
config.RegisterIntConfigVariable(32, &uploader.maxBatchSize, true, 1, "Debugger.maxBatchSize")
config.RegisterIntConfigVariable(1024, &uploader.maxESQueueSize, true, 1, "Debugger.maxESQueueSize")
config.RegisterIntConfigVariable(3, &uploader.maxRetry, true, 1, "Debugger.maxRetry")
config.RegisterDurationConfigVariable(2, &uploader.batchTimeout, true, time.Second, "Debugger.batchTimeoutInS")
config.RegisterDurationConfigVariable(100, &uploader.retrySleep, true, time.Millisecond, "Debugger.retrySleepInMS")
}
func New(url string, transformer Transformer) UploaderI {
eventBatchChannel := make(chan interface{})
eventBuffer := make([]interface{}, 0)
client := &http.Client{Timeout: config.GetDuration("HttpClient.debugger.timeout", 30, time.Second)}
uploader := &Uploader{url: url, transformer: transformer, eventBatchChannel: eventBatchChannel, eventBuffer: eventBuffer, Client: client, bgWaitGroup: sync.WaitGroup{}}
uploader.Setup()
return uploader
}
func (uploader *Uploader) Start() {
ctx, cancel := context.WithCancel(context.Background())
rruntime.Go(func() {
uploader.handleEvents()
cancel()
})
uploader.bgWaitGroup.Add(1)
rruntime.Go(func() {
uploader.flushEvents(ctx)
uploader.bgWaitGroup.Done()
})
}
func (upload *Uploader) Stop() {
close(upload.eventBatchChannel)
upload.bgWaitGroup.Wait()
}
// RecordEvent is used to put the event batch in the eventBatchChannel,
// which will be processed by handleEvents.
func (uploader *Uploader) RecordEvent(data interface{}) {
uploader.eventBatchChannel <- data
}
func (uploader *Uploader) uploadEvents(eventBuffer []interface{}) {
// Upload to a Config Backend
rawJSON, err := uploader.transformer.Transform(eventBuffer)
if err != nil {
return
}
url := uploader.url
retryCount := 1
var resp *http.Response
// Sending event schema to Config Backend
for {
req, err := Http.NewRequest("POST", url, bytes.NewBuffer(rawJSON))
if err != nil {
pkgLogger.Errorf("[Uploader] Failed to create new http request. Err: %v", err)
return
}
req.Header.Set("Content-Type", "application/json;charset=UTF-8")
req.SetBasicAuth(config.GetWorkspaceToken(), "")
resp, err = uploader.Client.Do(req)
if err != nil {
pkgLogger.Error("Config Backend connection error", err)
if retryCount >= uploader.maxRetry {
pkgLogger.Errorf("Max retries exceeded trying to connect to config backend")
return
}
retryCount++
time.Sleep(uploader.retrySleep)
// Refresh the connection
continue
}
defer func() { httputil.CloseResponse(resp) }()
break
}
if resp.StatusCode != http.StatusOK {
pkgLogger.Errorf("[Uploader] Response Error from Config Backend: Status: %v, Body: %v ", resp.StatusCode, resp.Body)
}
}
func (uploader *Uploader) handleEvents() {
for eventSchema := range uploader.eventBatchChannel {
uploader.eventBufferLock.Lock()
// If eventBuffer size is more than maxESQueueSize, Delete oldest.
if len(uploader.eventBuffer) >= uploader.maxESQueueSize {
uploader.eventBuffer[0] = nil
uploader.eventBuffer = uploader.eventBuffer[1:]
}
// Append to request buffer
uploader.eventBuffer = append(uploader.eventBuffer, eventSchema)
uploader.eventBufferLock.Unlock()
}
}
func (uploader *Uploader) flushEvents(ctx context.Context) {
for {
select {
case <-ctx.Done():
case <-time.After(uploader.batchTimeout):
}
uploader.eventBufferLock.Lock()
flushSize := len(uploader.eventBuffer)
var flushEvents []interface{}
if flushSize > uploader.maxBatchSize {
flushSize = uploader.maxBatchSize
}
if flushSize > 0 {
flushEvents = uploader.eventBuffer[:flushSize]
uploader.eventBuffer = uploader.eventBuffer[flushSize:]
}
uploader.eventBufferLock.Unlock()
if flushSize > 0 {
uploader.uploadEvents(flushEvents)
}
flushEvents = nil
if ctx.Err() != nil {
return
}
}
}