This repository has been archived by the owner on Oct 23, 2023. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 62
/
admin_eventsink.go
99 lines (87 loc) · 2.78 KB
/
admin_eventsink.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
package events
import (
"context"
"fmt"
admin2 "github.com/lyft/flyteidl/clients/go/admin"
"github.com/golang/protobuf/proto"
"github.com/lyft/flyteidl/clients/go/events/errors"
"github.com/lyft/flyteidl/gen/pb-go/flyteidl/admin"
"github.com/lyft/flyteidl/gen/pb-go/flyteidl/event"
"github.com/lyft/flyteidl/gen/pb-go/flyteidl/service"
"github.com/lyft/flytestdlib/logger"
"golang.org/x/time/rate"
)
type adminEventSink struct {
adminClient service.AdminServiceClient
rateLimiter *rate.Limiter
cfg *Config
}
// Constructs a new EventSink that sends events to FlyteAdmin through gRPC
func NewAdminEventSink(ctx context.Context, adminClient service.AdminServiceClient, config *Config) (EventSink, error) {
rateLimiter := rate.NewLimiter(rate.Limit(config.Rate), config.Capacity)
eventSink := &adminEventSink{
adminClient: adminClient,
rateLimiter: rateLimiter,
cfg: config,
}
logger.Infof(ctx, "Created new AdminEventSink to Admin service")
return eventSink, nil
}
// Sends events to the FlyteAdmin service through gRPC
func (s *adminEventSink) Sink(ctx context.Context, message proto.Message) error {
logger.Debugf(ctx, "AdminEventSink received a new event %s", message.String())
if s.rateLimiter.Allow() {
switch eventMessage := message.(type) {
case *event.WorkflowExecutionEvent:
request := &admin.WorkflowExecutionEventRequest{
Event: eventMessage,
}
_, err := s.adminClient.CreateWorkflowEvent(ctx, request)
if err != nil {
return errors.WrapError(err)
}
case *event.NodeExecutionEvent:
request := &admin.NodeExecutionEventRequest{
Event: eventMessage,
}
_, err := s.adminClient.CreateNodeEvent(ctx, request)
if err != nil {
return errors.WrapError(err)
}
case *event.TaskExecutionEvent:
request := &admin.TaskExecutionEventRequest{
Event: eventMessage,
}
_, err := s.adminClient.CreateTaskEvent(ctx, request)
if err != nil {
return errors.WrapError(err)
}
default:
return fmt.Errorf("unknown event type [%s]", eventMessage.String())
}
} else {
return &errors.EventError{Code: errors.ResourceExhausted,
Cause: fmt.Errorf("Admin EventSink throttling admin traffic"), Message: "Resource Exhausted"}
}
return nil
}
// Closes the gRPC client connection. This should be deferred on the client does shutdown cleanup.
func (s *adminEventSink) Close() error {
return nil
}
func ConstructEventSink(ctx context.Context, config *Config) (EventSink, error) {
switch config.Type {
case EventSinkLog:
return NewLogSink()
case EventSinkFile:
return NewFileSink(config.FilePath)
case EventSinkAdmin:
adminClient, err := admin2.InitializeAdminClientFromConfig(ctx)
if err != nil {
return nil, err
}
return NewAdminEventSink(ctx, adminClient, config)
default:
return NewStdoutSink()
}
}