Skip to content

Commit

Permalink
feat: enable agent handling of stop requests (#3539)
Browse files Browse the repository at this point in the history
  • Loading branch information
schoren committed Jan 19, 2024
1 parent c1b0e95 commit eac8b6d
Show file tree
Hide file tree
Showing 20 changed files with 1,281 additions and 817 deletions.
10 changes: 10 additions & 0 deletions agent/client/client.go
Expand Up @@ -44,6 +44,7 @@ type Client struct {

logger *zap.Logger

stopListener func(context.Context, *proto.StopRequest) error
triggerListener func(context.Context, *proto.TriggerRequest) error
pollListener func(context.Context, *proto.PollingRequest) error
shutdownListener func(context.Context, *proto.ShutdownRequest) error
Expand All @@ -66,6 +67,11 @@ func (c *Client) Start(ctx context.Context) error {
cancel()
}()

err = c.startStopListener(ctx)
if err != nil {
return err
}

err = c.startTriggerListener(ctx)
if err != nil {
return err
Expand Down Expand Up @@ -112,6 +118,10 @@ func (c *Client) Close() error {
return c.conn.Close()
}

func (c *Client) OnStopRequest(listener func(context.Context, *proto.StopRequest) error) {
c.stopListener = listener
}

func (c *Client) OnTriggerRequest(listener func(context.Context, *proto.TriggerRequest) error) {
c.triggerListener = listener
}
Expand Down
8 changes: 8 additions & 0 deletions agent/client/mocks/grpc_server.go
Expand Up @@ -88,6 +88,14 @@ func (s *GrpcServerMock) Connect(ctx context.Context, req *proto.ConnectRequest)
}, nil
}

func (s *GrpcServerMock) RegisterStopRequestAgent(id *proto.AgentIdentification, stream proto.Orchestrator_RegisterStopRequestAgentServer) error {
if id.Token != "token" {
return fmt.Errorf("could not validate token")
}

return nil
}

func (s *GrpcServerMock) RegisterTriggerAgent(id *proto.AgentIdentification, stream proto.Orchestrator_RegisterTriggerAgentServer) error {
if id.Token != "token" {
return fmt.Errorf("could not validate token")
Expand Down
47 changes: 47 additions & 0 deletions agent/client/workflow_listen_for_stop_requests.go
@@ -0,0 +1,47 @@
package client

import (
"context"
"fmt"
"log"
"time"

"github.com/kubeshop/tracetest/agent/proto"
)

func (c *Client) startStopListener(ctx context.Context) error {
client := proto.NewOrchestratorClient(c.conn)

stream, err := client.RegisterStopRequestAgent(ctx, c.sessionConfig.AgentIdentification)
if err != nil {
return fmt.Errorf("could not open agent stream: %w", err)
}

go func() {
for {
resp := proto.StopRequest{}
err := stream.RecvMsg(&resp)
if isEndOfFileError(err) || isCancelledError(err) {
return
}

reconnected, err := c.handleDisconnectionError(err)
if reconnected {
return
}

if err != nil {
log.Println("could not get message from trigger stream: %w", err)
time.Sleep(1 * time.Second)
continue
}

// TODO: get context from request
err = c.stopListener(context.Background(), &resp)
if err != nil {
fmt.Println(err.Error())
}
}
}()
return nil
}
19 changes: 19 additions & 0 deletions agent/event/observer.go
Expand Up @@ -18,6 +18,9 @@ type Observer interface {
StartDataStoreConnection(*proto.DataStoreConnectionTestRequest)
EndDataStoreConnection(*proto.DataStoreConnectionTestRequest, error)

StartStopRequest(*proto.StopRequest)
EndStopRequest(*proto.StopRequest, error)

Error(error)
}

Expand Down Expand Up @@ -110,3 +113,19 @@ func (o *wrapperObserver) EndTriggerExecution(request *proto.TriggerRequest, err

o.wrappedObserver.EndTriggerExecution(request, err)
}

func (o *wrapperObserver) StartStopRequest(request *proto.StopRequest) {
if o.wrappedObserver == nil {
return
}

o.wrappedObserver.StartStopRequest(request)
}

func (o *wrapperObserver) EndStopRequest(request *proto.StopRequest, err error) {
if o.wrappedObserver == nil {
return
}

o.wrappedObserver.EndStopRequest(request, err)
}

0 comments on commit eac8b6d

Please sign in to comment.