-
Notifications
You must be signed in to change notification settings - Fork 296
/
types.go
146 lines (128 loc) · 5.24 KB
/
types.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
package types
import (
"encoding/json"
"errors"
"time"
"github.com/samber/lo"
backendconfig "github.com/rudderlabs/rudder-server/backend-config"
"github.com/rudderlabs/rudder-server/jobsdb"
)
const RouterUnMarshalErrorCode = 599
// RouterJobT holds the router job and its related metadata
type RouterJobT struct {
Message json.RawMessage `json:"message"`
JobMetadata JobMetadataT `json:"metadata"`
Destination backendconfig.DestinationT `json:"destination"`
}
type DestinationJobs []DestinationJobT
// Hydrate jobs in the destination jobs' job metadata array
func (djs DestinationJobs) Hydrate(jobs map[int64]*jobsdb.JobT) {
for i := range djs {
for j := range djs[i].JobMetadataArray {
if djs[i].JobMetadataArray[j].JobT == nil {
djs[i].JobMetadataArray[j].JobT = jobs[djs[i].JobMetadataArray[j].JobID]
}
}
}
}
// DestinationJobT holds the job to be sent to destination
// and metadata of all the router jobs from which this job is cooked up
type DestinationJobT struct {
Message json.RawMessage `json:"batchedRequest"`
JobMetadataArray []JobMetadataT `json:"metadata"` // multiple jobs may be batched in a single message
Destination backendconfig.DestinationT `json:"destination"`
Batched bool `json:"batched"`
StatusCode int `json:"statusCode"`
Error string `json:"error"`
AuthErrorCategory string `json:"authErrorCategory"`
}
func (dj *DestinationJobT) MinJobID() int64 {
return lo.Min(lo.Map(dj.JobMetadataArray, func(item JobMetadataT, _ int) int64 {
return item.JobID
}))
}
// JobIDs returns the set of all job ids contained in the message
func (dj *DestinationJobT) JobIDs() map[int64]struct{} {
jobIDs := make(map[int64]struct{})
for i := range dj.JobMetadataArray {
jobIDs[dj.JobMetadataArray[i].JobID] = struct{}{}
}
return jobIDs
}
// JobMetadataT holds the job metadata
type JobMetadataT struct {
UserID string `json:"userId"`
JobID int64 `json:"jobId"`
SourceID string `json:"sourceId"`
SourceCategory string `json:"sourceCategory"`
DestinationID string `json:"destinationId"`
AttemptNum int `json:"attemptNum"`
ReceivedAt string `json:"receivedAt"`
CreatedAt string `json:"createdAt"`
FirstAttemptedAt string `json:"firstAttemptedAt"`
TransformAt string `json:"transformAt"`
WorkspaceID string `json:"workspaceId"`
Secret json.RawMessage `json:"secret"`
JobT *jobsdb.JobT `json:"jobsT,omitempty"`
WorkerAssignedTime time.Time `json:"workerAssignedTime"`
DestInfo json.RawMessage `json:"destInfo,omitempty"`
DontBatch bool `json:"dontBatch"`
TraceParent string `json:"traceparent"`
}
// TransformMessageT is used to pass message to the transformer workers
type TransformMessageT struct {
Data []RouterJobT `json:"input"`
DestType string `json:"destType"`
}
// Dehydrate JobT information from RouterJobT.JobMetadata returning the dehydrated message along with the jobs
func (tm *TransformMessageT) Dehydrate() (*TransformMessageT, map[int64]*jobsdb.JobT) {
jobs := make(map[int64]*jobsdb.JobT)
tmCopy := *tm
tmCopy.Data = nil
for i := range tm.Data {
tmCopy.Data = append(tmCopy.Data, tm.Data[i])
jobs[tmCopy.Data[i].JobMetadata.JobID] = tmCopy.Data[i].JobMetadata.JobT
tmCopy.Data[i].JobMetadata.JobT = nil
}
return &tmCopy, jobs
}
// JobIDs returns the set of all job ids of the jobs in the message
func (tm *TransformMessageT) JobIDs() map[int64]struct{} {
jobIDs := make(map[int64]struct{})
for i := range tm.Data {
jobIDs[tm.Data[i].JobMetadata.JobID] = struct{}{}
}
return jobIDs
}
func NewEventTypeThrottlingCost(m map[string]interface{}) (v EventTypeThrottlingCost) {
if et, ok := m["eventType"].(map[string]interface{}); ok {
v = et
}
return v
}
type EventTypeThrottlingCost map[string]interface{}
func (e *EventTypeThrottlingCost) Cost(eventType string) (cost int64) {
if v, ok := (*e)[eventType].(float64); ok && v > 0 {
return int64(v)
}
if defaultCost, ok := (*e)["default"].(float64); ok && defaultCost > 0 {
return int64(defaultCost)
}
return 1
}
var (
// ErrContextCancelled is returned when the context is cancelled
ErrContextCancelled = errors.New("context cancelled")
// ErrParamsUnmarshal is returned when it is not possible to unmarshal the job parameters
ErrParamsUnmarshal = errors.New("unmarshal params")
// ErrJobOrderBlocked is returned when the job is blocked by another job discarded by the router in the same loop
ErrJobOrderBlocked = errors.New("blocked")
// ErrWorkerNoSlot is returned when the worker doesn't have an available slot
ErrWorkerNoSlot = errors.New("no slot")
// ErrJobBackoff is returned when the job is backoffed
ErrJobBackoff = errors.New("backoff")
// ErrDestinationThrottled is returned when the destination is being throttled
ErrDestinationThrottled = errors.New("throttled")
// ErrBarrierExists is returned when a job ordering barrier exists for the job's ordering key
ErrBarrierExists = errors.New("barrier")
)