/
sink.go
75 lines (59 loc) · 1.73 KB
/
sink.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at http://mozilla.org/MPL/2.0/.
package events
import (
"context"
"google.golang.org/grpc/peer"
"google.golang.org/protobuf/proto"
"github.com/siderolabs/siderolink/api/events"
)
// Adapter is an abstract event stream receiver.
type Adapter interface {
HandleEvent(ctx context.Context, event Event) error
}
// Sink implements events.EventSinkServiceServer.
type Sink struct {
events.UnimplementedEventSinkServiceServer
adapter Adapter
supportedEvents map[string]proto.Message
}
// NewSink creates new events sink service.
func NewSink(a Adapter, supportedTypes []proto.Message) *Sink {
sink := &Sink{
adapter: a,
supportedEvents: make(map[string]proto.Message),
}
for _, eventType := range supportedTypes {
sink.supportedEvents["type.googleapis.com/"+string(eventType.ProtoReflect().Descriptor().FullName())] = eventType
}
return sink
}
// Publish implements events.EventSinkServiceServer.
func (s *Sink) Publish(ctx context.Context, e *events.EventRequest) (*events.EventResponse, error) {
var (
typeURL = e.Data.TypeUrl
res = &events.EventResponse{}
)
msg := s.supportedEvents[typeURL]
if msg == nil {
// We haven't implemented the handling of this event yet.
return res, nil
}
msg = proto.Clone(msg)
if err := proto.Unmarshal(e.GetData().GetValue(), msg); err != nil {
return res, err
}
var node string
peer, ok := peer.FromContext(ctx)
if ok {
node = peer.Addr.String()
}
return res, s.adapter.HandleEvent(ctx, Event{
Node: node,
TypeURL: typeURL,
ID: e.Id,
Payload: msg,
ActorID: e.ActorId,
})
}