-
Notifications
You must be signed in to change notification settings - Fork 2
/
connector.go
120 lines (101 loc) · 2.84 KB
/
connector.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
package workflow
import (
"context"
"fmt"
"time"
"github.com/luno/workflow/internal/metrics"
)
type ConnectorConstructor interface {
Make(ctx context.Context, consumerName string) (ConnectorConsumer, error)
}
type ConnectorConsumer interface {
Recv(ctx context.Context) (*ConnectorEvent, Ack, error)
Close() error
}
type ConnectorFunc[Type any, Status StatusType] func(ctx context.Context, w *Workflow[Type, Status], e *ConnectorEvent) error
type connectorConfig[Type any, Status StatusType] struct {
name string
constructor ConnectorConstructor
connectorFn ConnectorFunc[Type, Status]
pollingFrequency time.Duration
errBackOff time.Duration
parallelCount int
lag time.Duration
lagAlert time.Duration
}
func connectorConsumer[Type any, Status StatusType](w *Workflow[Type, Status], config *connectorConfig[Type, Status], shard, totalShards int) {
role := makeRole(
config.name,
"connector",
"to",
w.Name,
"consumer",
fmt.Sprintf("%v", shard),
"of",
fmt.Sprintf("%v", totalShards),
)
// processName can have the same name as the role. It is the same here due to the fact that there are no enums
// that can be converted to a meaningful string
processName := role
w.run(role, processName, func(ctx context.Context) error {
consumer, err := config.constructor.Make(ctx, role)
if err != nil {
return err
}
defer consumer.Close()
return connectForever(ctx, w, config, consumer, processName, shard, totalShards)
}, config.errBackOff)
}
func connectForever[Type any, Status StatusType](
ctx context.Context,
w *Workflow[Type, Status],
config *connectorConfig[Type, Status],
consumer ConnectorConsumer,
processName string,
shard, totalShards int,
) error {
for {
if ctx.Err() != nil {
return ctx.Err()
}
e, ack, err := consumer.Recv(ctx)
if err != nil {
return err
}
var lag time.Duration
if config.lag.Nanoseconds() != 0 {
lag = config.lag
}
// Wait until the event's timestamp matches or is older than the specified lag.
delay := lag - w.clock.Since(e.CreatedAt)
if lag > 0 && delay > 0 {
t := w.clock.NewTimer(delay)
select {
case <-ctx.Done():
t.Stop()
return ctx.Err()
case <-t.C():
// Resume to consume the event now that it matches or is older than specified lag.
}
}
// Push metrics and alerting around the age of the event being processed.
pushLagMetricAndAlerting(w.Name, processName, e.CreatedAt, config.lagAlert, w.clock)
shouldFilter := FilterConnectorEventUsing(e,
shardConnectorEventFilter(shard, totalShards),
)
if shouldFilter {
err = ack()
if err != nil {
return err
}
continue
}
t2 := w.clock.Now()
err = config.connectorFn(ctx, w, e)
if err != nil {
return err
}
metrics.ProcessLatency.WithLabelValues(w.Name, processName).Observe(w.clock.Since(t2).Seconds())
return ack()
}
}