-
Notifications
You must be signed in to change notification settings - Fork 307
/
transformer.go
408 lines (357 loc) · 13.6 KB
/
transformer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
package transformer
//go:generate mockgen -destination=../../mocks/processor/transformer/mock_transformer.go -package=mocks_transformer github.com/rudderlabs/rudder-server/processor/transformer Transformer
import (
"bytes"
"context"
"fmt"
"io"
"net/http"
"runtime/trace"
"strconv"
"sync"
"time"
"github.com/cenkalti/backoff"
jsoniter "github.com/json-iterator/go"
"github.com/rudderlabs/rudder-server/config"
backendconfig "github.com/rudderlabs/rudder-server/config/backend-config"
"github.com/rudderlabs/rudder-server/processor/integrations"
"github.com/rudderlabs/rudder-server/services/stats"
"github.com/rudderlabs/rudder-server/utils/logger"
"github.com/rudderlabs/rudder-server/utils/types"
)
const (
UserTransformerStage = "user_transformer"
EventFilterStage = "event_filter"
DestTransformerStage = "dest_transformer"
TrackingPlanValidationStage = "trackingPlan_validation"
)
const StatusCPDown = 809
var jsonfast = jsoniter.ConfigCompatibleWithStandardLibrary
type MetadataT struct {
SourceID string `json:"sourceId"`
WorkspaceID string `json:"workspaceId"`
Namespace string `json:"namespace"`
InstanceID string `json:"instanceId"`
SourceType string `json:"sourceType"`
SourceCategory string `json:"sourceCategory"`
TrackingPlanId string `json:"trackingPlanId"`
TrackingPlanVersion int `json:"trackingPlanVersion"`
SourceTpConfig map[string]map[string]interface{} `json:"sourceTpConfig"`
MergedTpConfig map[string]interface{} `json:"mergedTpConfig"`
DestinationID string `json:"destinationId"`
JobRunID string `json:"jobRunId"`
JobID int64 `json:"jobId"`
SourceBatchID string `json:"sourceBatchId"`
SourceJobID string `json:"sourceJobId"`
SourceJobRunID string `json:"sourceJobRunId"`
SourceTaskID string `json:"sourceTaskId"`
SourceTaskRunID string `json:"sourceTaskRunId"`
RecordID interface{} `json:"recordId"`
DestinationType string `json:"destinationType"`
MessageID string `json:"messageId"`
OAuthAccessToken string `json:"oauthAccessToken"`
// set by user_transformer to indicate transformed event is part of group indicated by messageIDs
MessageIDs []string `json:"messageIds"`
RudderID string `json:"rudderId"`
ReceivedAt string `json:"receivedAt"`
EventName string `json:"eventName"`
EventType string `json:"eventType"`
SourceDefinitionID string `json:"sourceDefinitionId"`
DestinationDefinitionID string `json:"destinationDefinitionId"`
}
type TransformerEventT struct {
Message types.SingularEventT `json:"message"`
Metadata MetadataT `json:"metadata"`
Destination backendconfig.DestinationT `json:"destination"`
Libraries []backendconfig.LibraryT `json:"libraries"`
}
// HandleT is the handle for this class
type HandleT struct {
sentStat stats.Measurement
receivedStat stats.Measurement
cpDownGauge stats.Measurement
logger logger.Logger
Client *http.Client
guardConcurrency chan struct{}
}
// Transformer provides methods to transform events
type Transformer interface {
Setup()
Transform(ctx context.Context, clientEvents []TransformerEventT, url string, batchSize int) ResponseT
Validate(clientEvents []TransformerEventT, url string, batchSize int) ResponseT
}
// NewTransformer creates a new transformer
func NewTransformer() *HandleT {
return &HandleT{}
}
var (
maxConcurrency, maxHTTPConnections, maxHTTPIdleConnections, maxRetry int
retrySleep time.Duration
timeoutDuration time.Duration
pkgLogger logger.Logger
)
func Init() {
loadConfig()
pkgLogger = logger.NewLogger().Child("processor").Child("transformer")
}
func loadConfig() {
config.RegisterIntConfigVariable(200, &maxConcurrency, false, 1, "Processor.maxConcurrency")
config.RegisterIntConfigVariable(100, &maxHTTPConnections, false, 1, "Processor.maxHTTPConnections")
config.RegisterIntConfigVariable(50, &maxHTTPIdleConnections, false, 1, "Processor.maxHTTPIdleConnections")
config.RegisterIntConfigVariable(30, &maxRetry, true, 1, "Processor.maxRetry")
config.RegisterDurationConfigVariable(100, &retrySleep, true, time.Millisecond, []string{"Processor.retrySleep", "Processor.retrySleepInMS"}...)
config.RegisterDurationConfigVariable(30, &timeoutDuration, false, time.Second, "HttpClient.procTransformer.timeout")
}
type TransformerResponseT struct {
// Not marking this Singular Event, since this not a RudderEvent
Output map[string]interface{} `json:"output"`
Metadata MetadataT `json:"metadata"`
StatusCode int `json:"statusCode"`
Error string `json:"error"`
ValidationErrors []ValidationErrorT `json:"validationErrors"`
}
type ValidationErrorT struct {
Type string `json:"type"`
Message string `json:"message"`
Meta map[string]string `json:"meta"`
}
// Setup initializes this class
func (trans *HandleT) Setup() {
trans.logger = pkgLogger
trans.sentStat = stats.Default.NewStat("processor.transformer_sent", stats.CountType)
trans.receivedStat = stats.Default.NewStat("processor.transformer_received", stats.CountType)
trans.cpDownGauge = stats.Default.NewStat("processor.control_plane_down", stats.GaugeType)
trans.guardConcurrency = make(chan struct{}, maxConcurrency)
if trans.Client == nil {
trans.Client = &http.Client{
Transport: &http.Transport{
MaxConnsPerHost: maxHTTPConnections,
MaxIdleConnsPerHost: maxHTTPIdleConnections,
IdleConnTimeout: time.Minute,
},
Timeout: timeoutDuration,
}
}
}
// ResponseT represents a Transformer response
type ResponseT struct {
Events []TransformerResponseT
FailedEvents []TransformerResponseT
}
// GetVersion gets the transformer version by asking it on /transfomerBuildVersion. if there is any error it returns empty string
func GetVersion() (transformerBuildVersion string) {
transformerBuildVersion = "Not an official release. Get the latest release from dockerhub."
url := integrations.GetTransformerURL() + "/transformerBuildVersion"
resp, err := http.Get(url)
if err != nil {
pkgLogger.Errorf("Unable to make a transfomer build version call with error : %s", err.Error())
return
}
if resp == nil {
transformerBuildVersion = fmt.Sprintf("No response from transformer. %s", transformerBuildVersion)
return
}
defer func() { _ = resp.Body.Close() }()
if resp.StatusCode == http.StatusOK {
bodyBytes, err := io.ReadAll(resp.Body)
if err != nil {
pkgLogger.Errorf("Unable to read response into bytes with error : %s", err.Error())
transformerBuildVersion = "Unable to read response from transformer."
return
}
transformerBuildVersion = string(bodyBytes)
}
return
}
// Transform function is used to invoke transformer API
func (trans *HandleT) Transform(ctx context.Context, clientEvents []TransformerEventT,
url string, batchSize int,
) ResponseT {
if len(clientEvents) == 0 {
return ResponseT{}
}
sTags := statsTags(clientEvents[0])
batchCount := len(clientEvents) / batchSize
if len(clientEvents)%batchSize != 0 {
batchCount += 1
}
stats.Default.NewTaggedStat(
"processor.transformer_request_batch_count",
stats.HistogramType,
sTags,
).Observe(float64(batchCount))
trace.Logf(ctx, "request", "batch_count: %d", batchCount)
transformResponse := make([][]TransformerResponseT, batchCount)
wg := sync.WaitGroup{}
wg.Add(len(transformResponse))
for i := range transformResponse {
i := i
from := i * batchSize
to := (i + 1) * batchSize
if to > len(clientEvents) {
to = len(clientEvents)
}
trans.guardConcurrency <- struct{}{}
go func() {
trace.WithRegion(ctx, "request", func() {
transformResponse[i] = trans.request(ctx, url, clientEvents[from:to])
})
<-trans.guardConcurrency
wg.Done()
}()
}
wg.Wait()
var outClientEvents []TransformerResponseT
var failedEvents []TransformerResponseT
for _, batch := range transformResponse {
if batch == nil {
continue
}
// Transform is one to many mapping so returned
// response for each is an array. We flatten it out
for _, transformerResponse := range batch {
if transformerResponse.StatusCode != 200 {
failedEvents = append(failedEvents, transformerResponse)
continue
}
outClientEvents = append(outClientEvents, transformerResponse)
}
}
trans.receivedStat.Count(len(outClientEvents))
return ResponseT{
Events: outClientEvents,
FailedEvents: failedEvents,
}
}
func (trans *HandleT) Validate(clientEvents []TransformerEventT,
url string, batchSize int,
) ResponseT {
return trans.Transform(context.TODO(), clientEvents, url, batchSize)
}
func (*HandleT) requestTime(s stats.Tags, d time.Duration) {
stats.Default.NewTaggedStat("processor.transformer_request_time", stats.TimerType, s).SendTiming(d)
}
func statsTags(event TransformerEventT) stats.Tags {
return stats.Tags{
"dest_type": event.Destination.DestinationDefinition.Name,
"dest_id": event.Destination.ID,
"src_id": event.Metadata.SourceID,
}
}
func (trans *HandleT) request(ctx context.Context, url string, data []TransformerEventT) []TransformerResponseT {
// Call remote transformation
var (
rawJSON []byte
err error
)
trace.WithRegion(ctx, "marshal", func() {
rawJSON, err = jsonfast.Marshal(data)
})
trace.Logf(ctx, "marshal", "request raw body size: %d", len(rawJSON))
if err != nil {
panic(err)
}
if len(data) == 0 {
return nil
}
var (
respData []byte
statusCode int
)
// endless retry if transformer-controlplane connection is down
endlessBackoff := backoff.NewExponentialBackOff()
endlessBackoff.MaxElapsedTime = 0 // no max time -> ends only when no error
// endless backoff loop, only nil error or panics inside
_ = backoff.RetryNotify(
func() error {
respData, statusCode = trans.doPost(ctx, rawJSON, url, statsTags(data[0]))
if statusCode == StatusCPDown {
trans.cpDownGauge.Gauge(1)
return fmt.Errorf("control plane not reachable")
}
trans.cpDownGauge.Gauge(0)
return nil
},
endlessBackoff,
func(err error, t time.Duration) {
trans.logger.Errorf("JS HTTP connection error: URL: %v Error: %+v", url, err)
})
// control plane back up
// Remove Assertion?
switch statusCode {
case http.StatusOK,
http.StatusBadRequest,
http.StatusNotFound,
http.StatusRequestEntityTooLarge:
default:
trans.logger.Errorf("Transformer returned status code: %v", statusCode)
}
var transformerResponses []TransformerResponseT
if statusCode == http.StatusOK {
integrations.CollectIntgTransformErrorStats(respData)
trace.Logf(ctx, "Unmarshal", "response raw size: %d", len(respData))
trace.WithRegion(ctx, "Unmarshal", func() {
err = jsonfast.Unmarshal(respData, &transformerResponses)
})
// This is returned by our JS engine so should be parsable
// but still handling it
if err != nil {
trans.logger.Errorf("Data sent to transformer : %v", string(rawJSON))
trans.logger.Errorf("Transformer returned : %v", string(respData))
respData = []byte(fmt.Sprintf("Failed to unmarshal transformer response: %s", string(respData)))
transformerResponses = nil
statusCode = 400
}
}
if statusCode != http.StatusOK {
for i := range data {
transformEvent := &data[i]
resp := TransformerResponseT{StatusCode: statusCode, Error: string(respData), Metadata: transformEvent.Metadata}
transformerResponses = append(transformerResponses, resp)
}
}
return transformerResponses
}
func (trans *HandleT) doPost(ctx context.Context, rawJSON []byte, url string, tags stats.Tags) ([]byte, int) {
var (
retryCount int
resp *http.Response
respData []byte
)
err := backoff.RetryNotify(
func() error {
var reqErr error
s := time.Now()
trace.WithRegion(ctx, "request/post", func() {
resp, reqErr = trans.Client.Post(url, "application/json; charset=utf-8", bytes.NewBuffer(rawJSON))
})
trans.requestTime(tags, time.Since(s))
if reqErr != nil {
return reqErr
}
respData, reqErr = io.ReadAll(resp.Body)
_ = resp.Body.Close()
return reqErr
},
backoff.WithMaxRetries(backoff.NewExponentialBackOff(), uint64(maxRetry)),
func(err error, t time.Duration) {
retryCount++
trans.logger.Warnf("JS HTTP connection error: URL: %v Error: %+v after %v tries", url, err, retryCount)
})
if err != nil {
panic(err)
}
// perform version compatibility check only on success
if resp.StatusCode == http.StatusOK {
transformerAPIVersion, convErr := strconv.Atoi(resp.Header.Get("apiVersion"))
if convErr != nil {
transformerAPIVersion = 0
}
if types.SUPPORTED_TRANSFORMER_API_VERSION != transformerAPIVersion {
unexpectedVersionError := fmt.Errorf("incompatible transformer version: Expected: %d Received: %d, URL: %v", types.SUPPORTED_TRANSFORMER_API_VERSION, transformerAPIVersion, url)
trans.logger.Error(unexpectedVersionError)
panic(unexpectedVersionError)
}
}
return respData, resp.StatusCode
}