-
Notifications
You must be signed in to change notification settings - Fork 307
/
utils.go
101 lines (83 loc) · 2.35 KB
/
utils.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
package utils
import (
"strings"
"time"
"github.com/rudderlabs/rudder-server/config"
backendconfig "github.com/rudderlabs/rudder-server/config/backend-config"
"github.com/rudderlabs/rudder-server/jobsdb"
"github.com/rudderlabs/rudder-server/utils/misc"
"github.com/tidwall/gjson"
"github.com/tidwall/sjson"
)
var (
JobRetention time.Duration
EmptyPayload = []byte(`{}`)
)
const (
DRAIN_ERROR_CODE int = 410
// transformation(router or batch)
ERROR_AT_TF = "transformation"
// event delivery
ERROR_AT_DEL = "delivery"
// custom destination manager
ERROR_AT_CUST = "custom"
)
type BatchDestinationT struct {
Destination backendconfig.DestinationT
Sources []backendconfig.SourceT
}
type DrainStats struct {
Count int
Reasons []string
Workspace string
}
type SendPostResponse struct {
StatusCode int
ResponseContentType string
ResponseBody []byte
}
func Init() {
loadConfig()
}
func loadConfig() {
config.RegisterDurationConfigVariable(720, &JobRetention, true, time.Hour, "Router.jobRetention")
}
func getRetentionTimeForDestination(destID string) time.Duration {
if config.IsSet("Router." + destID + ".jobRetention") {
return config.GetDuration("Router."+destID+".jobRetention", 720, time.Hour)
}
return JobRetention
}
func ToBeDrained(job *jobsdb.JobT, destID, toAbortDestinationIDs string, destinationsMap map[string]*BatchDestinationT) (bool, string) {
// drain if job is older than the destination's retention time
jobReceivedAt := gjson.GetBytes(job.Parameters, "received_at")
if jobReceivedAt.Exists() {
jobReceivedAtTime, err := time.Parse(misc.RFC3339Milli, jobReceivedAt.String())
if err == nil {
if time.Since(jobReceivedAtTime) > getRetentionTimeForDestination(destID) {
return true, "job expired"
}
}
}
if d, ok := destinationsMap[destID]; ok && !d.Destination.Enabled {
return true, "destination is disabled"
}
if toAbortDestinationIDs != "" {
abortIDs := strings.Split(toAbortDestinationIDs, ",")
if misc.Contains(abortIDs, destID) {
return true, "destination configured to abort"
}
}
return false, ""
}
// rawMsg passed must be a valid JSON
func EnhanceJSON(rawMsg []byte, key, val string) []byte {
resp, err := sjson.SetBytes(rawMsg, key, val)
if err != nil {
return []byte(`{}`)
}
return resp
}
func IsNotEmptyString(s string) bool {
return len(strings.TrimSpace(s)) > 0
}