Skip to content

Commit

Permalink
feat: pass request context and node address to the events sink adapter
Browse files Browse the repository at this point in the history
Context may be handy to cancel any subsequent calls in the adapter.
While node Addr is required in Sidero to match nodes.

Signed-off-by: Artem Chernyshev <artem.chernyshev@talos-systems.com>
  • Loading branch information
Unix4ever committed Nov 30, 2021
1 parent d0612a7 commit 9902ad2
Show file tree
Hide file tree
Showing 4 changed files with 19 additions and 5 deletions.
4 changes: 2 additions & 2 deletions cmd/siderolink-agent/event_sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,13 @@ var eventSinkFlags struct {
type adapter struct{}

// HandleEvent implements events.Adapter.
func (s *adapter) HandleEvent(e events.Event) error {
func (s *adapter) HandleEvent(ctx context.Context, e events.Event) error {
data, err := yaml.Marshal(e.Payload)
if err != nil {
return err
}

log.Printf("Event: %s, ID: %s, Payload: \n\t%s", e.TypeURL, e.ID, strings.Join(strings.Split(string(data), "\n"), "\n\t"))
log.Printf("Node: %s, Event: %s, ID: %s, Payload: \n\t%s", e.Node, e.TypeURL, e.ID, strings.Join(strings.Split(string(data), "\n"), "\n\t"))

return nil
}
Expand Down
1 change: 1 addition & 0 deletions pkg/events/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,4 +12,5 @@ type Event struct {
Payload proto.Message
TypeURL string
ID string
Node string
}
13 changes: 11 additions & 2 deletions pkg/events/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,15 @@ import (
"context"

"github.com/talos-systems/talos/pkg/machinery/api/machine"
"google.golang.org/grpc/peer"
"google.golang.org/protobuf/proto"

"github.com/talos-systems/siderolink/api/events"
)

// Adapter is an abstract event stream receiver.
type Adapter interface {
HandleEvent(event Event) error
HandleEvent(ctx context.Context, event Event) error
}

// Sink implements events.EventSinkServiceServer.
Expand Down Expand Up @@ -64,7 +65,15 @@ func (s *Sink) Publish(ctx context.Context, e *events.EventRequest) (*events.Eve
return res, err
}

return res, s.adapter.HandleEvent(Event{
var node string

peer, ok := peer.FromContext(ctx)
if ok {
node = peer.Addr.String()
}

return res, s.adapter.HandleEvent(ctx, Event{
Node: node,
TypeURL: typeURL,
ID: e.Id,
Payload: msg,
Expand Down
6 changes: 5 additions & 1 deletion pkg/events/sink_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,14 @@ type state struct {
}

// HandleEvent implements events.Adapter.
func (s *state) HandleEvent(e events.Event) error {
func (s *state) HandleEvent(ctx context.Context, e events.Event) error {
s.stateMu.Lock()
defer s.stateMu.Unlock()

if e.Node == "" {
return fmt.Errorf("node address information is empty")
}

switch msg := e.Payload.(type) {
case *machine.AddressEvent:
s.Addresses = msg.Addresses
Expand Down

0 comments on commit 9902ad2

Please sign in to comment.