Skip to content

Commit

Permalink
feat(agent): agent development (#3081)
Browse files Browse the repository at this point in the history
* feat: add structure for agent (#2964)

* feat: add structure for agent

* add short and long description for agent command

* fix build

* add go.work

* feat: create client for receiving trigger requests from the server (#2970)

* wip: connect to grpc server

* feat: add connection workflow

* add protobuf for trigger request

* add flow to receive trigger requests

* add comments and rename files

* cleanup makefile

* configure CI to run agent tests

* fix makefile

* add flow for sending trigger result back to the server

* feat: agent client polling methods (#2972)

* add flow for listening for polling requests

* feat: send spans to server

* feat: send trigger requests from agent (#2975)

* feat: add empty trigger worker

* move executor/trigger package to agent and adapt it

* adapt test

* implement trigger

* test trigger flow

* send trigger response to server

* add TODO

* feat: agent polling (#2979)

* feat: implement trace polling on agent

* initialize polling worker on agent start

* pass worker functions directly to client hooks

* feat: agent internal collector (#2997)

* feat: add otlp and http servers with no operation yet

* feat: implement a very basic collector in the agent

* remove tracer from otlp_server structure

* Update agent/collector/http_server.go

Co-authored-by: Daniel Baptista Dias <danielbdias@users.noreply.github.com>

* Update agent/collector/http_server.go

Co-authored-by: Daniel Baptista Dias <danielbdias@users.noreply.github.com>

* reuse otlp servers

* apply daniel's comments to the otlp server code

* remove dependency on ioutil from http otlp server

* delete http server from agent collector

* make collector ingester use otlp ingester interface

---------

Co-authored-by: Daniel Baptista Dias <danielbdias@users.noreply.github.com>

* feat: agent shutdown (#3022)

* shutdown agent when receive shutdown request

* remove unused function

* fix brittle test

* centrilize stop function

* update mod

* fix merge errors;

* fix merge errors;

* more fixes

* feat: add agent token to proto and client (#3068)

feat: add agent identification to agent client proto

* add token to shutdown listener method (#3071)

* fix(agent): rename traceid response property in proto (#3076)

fix(agent): rename traceid response property to traceid

* update go.work.sum

* feat(agent): add kafka agent proto (#3080)

add kafka response

* fix module name

* feat(agent): inject env variables in agent configuration (#3097)

* feat(agent): inject env variables in agent configuration

* add tests to cover all config values are configurable via env vars

* add log at startup and set hostname as default agent name

* remove dev mode config

* fix config defaults test

* feat: make agent wait until it's disconnected to exit (#3109)

feat(agent): make agent wait until connection end

This prevents the agent from exiting right after connecting to the
server

---------

Co-authored-by: Daniel Baptista Dias <danielbdias@users.noreply.github.com>
Co-authored-by: Sebastian Choren <sebastian.choren@gmail.com>
  • Loading branch information
3 people authored Aug 31, 2023
1 parent 68db0b5 commit df2f7c1
Show file tree
Hide file tree
Showing 73 changed files with 14,957 additions and 34 deletions.
19 changes: 19 additions & 0 deletions .github/workflows/pull-request.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,25 @@ jobs:
- name: Run unit tests
run: cd server; make test -B

unit-test-agent:
name: Agent unit tests
runs-on: ubuntu-latest

steps:
- name: Checkout
uses: actions/checkout@v3
with:
fetch-depth: 0
- name: Setup go
uses: actions/setup-go@v3
with:
go-version-file: "go.work"
cache: true
cache-dependency-path: go.work
- name: Run unit tests
run: cd agent; make test -B


unit-test-web:
name: WebUI unit tests
runs-on: ubuntu-latest
Expand Down
22 changes: 22 additions & 0 deletions agent/Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
# Dependencies:
# https://grpc.io/docs/protoc-installation/

REQUIRED_BINS := protoc

build-proto: ensure-dependencies clean-proto
@protoc \
--go_out=./ \
--go_opt=paths=source_relative \
--go-grpc_out=./ \
--go-grpc_opt=paths=source_relative \
proto/orchestrator.proto

ensure-dependencies:
$(foreach bin,$(REQUIRED_BINS),\
$(if $(shell command -v $(bin) 2> /dev/null),,$(error Please install `$(bin)`)))

clean-proto:
@rm -f proto/*.go

test:
go test -timeout 150s -coverprofile=coverage.out ./...
128 changes: 128 additions & 0 deletions agent/client/client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
package client

import (
"context"
"fmt"
"os"
"time"

"github.com/kubeshop/tracetest/agent/proto"
"google.golang.org/grpc"
)

type Config struct {
APIKey string
AgentName string
}

type SessionConfig struct {
BatchTimeout time.Duration
AgentIdentification *proto.AgentIdentification
}

type Client struct {
conn *grpc.ClientConn
config Config
sessionConfig *SessionConfig
done chan bool

triggerListener func(context.Context, *proto.TriggerRequest) error
pollListener func(context.Context, *proto.PollingRequest) error
shutdownListener func(context.Context, *proto.ShutdownRequest) error
}

func (c *Client) Start(ctx context.Context) error {
err := c.startup(ctx)
if err != nil {
return err
}

ctx, cancel := context.WithCancel(ctx)
go func() {
<-c.done
// We cannot `defer cancel()` in this case because the start listener functions
// start one goroutine each and don't block the execution of this function.
// Thus, if we cancel the context, all those goroutines will fail.
cancel()
}()

err = c.startTriggerListener(ctx)
if err != nil {
return err
}

err = c.startPollerListener(ctx)
if err != nil {
return err
}

err = c.startShutdownListener(ctx)
if err != nil {
return err
}

return nil
}

func (c *Client) WaitUntilDisconnected() {
<-c.done
}

func (c *Client) SessionConfiguration() *SessionConfig {
if c.sessionConfig == nil {
return nil
}

deferredPtr := *c.sessionConfig
return &deferredPtr
}

func (c *Client) Close() error {
c.done <- true
return c.conn.Close()
}

func (c *Client) OnTriggerRequest(listener func(context.Context, *proto.TriggerRequest) error) {
c.triggerListener = listener
}

func (c *Client) OnPollingRequest(listener func(context.Context, *proto.PollingRequest) error) {
c.pollListener = listener
}

func (c *Client) OnConnectionClosed(listener func(context.Context, *proto.ShutdownRequest) error) {
c.shutdownListener = listener
}

func (c *Client) getConnectionRequest() (*proto.ConnectRequest, error) {
name, err := c.getName()
if err != nil {
return nil, err
}

request := proto.ConnectRequest{
ApiKey: c.config.APIKey,
Name: name,
}

return &request, nil
}

// getName retrieves the name of the agent. By default, it is the host name, however,
// it can be overwritten with an environment variable, or a flag.
func (c *Client) getName() (string, error) {
if name := c.config.AgentName; name != "" {
return name, nil
}

if name := os.Getenv("TRACETEST_AGENT_NAME"); name != "" {
return name, nil
}

hostname, err := os.Hostname()
if err != nil {
return "", fmt.Errorf("could not get hostname: %w", err)
}

return hostname, nil
}
37 changes: 37 additions & 0 deletions agent/client/connector.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package client

import (
"context"
"fmt"
"time"

"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
)

func Connect(ctx context.Context, endpoint string, opts ...Option) (*Client, error) {
conn, err := connect(ctx, endpoint)
if err != nil {
return nil, err
}

client := &Client{conn: conn}
for _, opt := range opts {
opt(client)
}

return client, nil
}

func connect(ctx context.Context, endpoint string) (*grpc.ClientConn, error) {
ctx, cancel := context.WithTimeout(ctx, 5*time.Second)
defer cancel()

// TODO: don't use insecure transportation
conn, err := grpc.DialContext(ctx, endpoint, grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
return nil, fmt.Errorf("could not connect to server: %w", err)
}

return conn, nil
}
152 changes: 152 additions & 0 deletions agent/client/mocks/grpc_server.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
package mocks

import (
"context"
"fmt"
"log"
"net"
"sync"

"github.com/kubeshop/tracetest/agent/proto"
"google.golang.org/grpc"
)

type GrpcServerMock struct {
proto.UnimplementedOrchestratorServer
port int
triggerChannel chan *proto.TriggerRequest
pollingChannel chan *proto.PollingRequest
terminationChannel chan *proto.ShutdownRequest

lastTriggerResponse *proto.TriggerResponse
lastPollingResponse *proto.PollingResponse
}

func NewGrpcServer() *GrpcServerMock {
server := &GrpcServerMock{
triggerChannel: make(chan *proto.TriggerRequest),
pollingChannel: make(chan *proto.PollingRequest),
terminationChannel: make(chan *proto.ShutdownRequest),
}
var wg sync.WaitGroup
wg.Add(1)

go server.start(&wg)

wg.Wait()

return server
}

func (s *GrpcServerMock) Addr() string {
return fmt.Sprintf("localhost:%d", s.port)
}

func (s *GrpcServerMock) start(wg *sync.WaitGroup) {
lis, err := net.Listen("tcp", ":0")
if err != nil {
log.Fatalf("failed to listen: %v", err)
}

s.port = lis.Addr().(*net.TCPAddr).Port

server := grpc.NewServer()
proto.RegisterOrchestratorServer(server, s)

wg.Done()
if err := server.Serve(lis); err != nil {
log.Fatalf("failed to serve: %v", err)
}
}

func (s *GrpcServerMock) Connect(ctx context.Context, req *proto.ConnectRequest) (*proto.AgentConfiguration, error) {
return &proto.AgentConfiguration{
Configuration: &proto.SessionConfiguration{
BatchTimeout: 1000,
},
Identification: &proto.AgentIdentification{
Token: "token",
},
}, nil
}

func (s *GrpcServerMock) RegisterTriggerAgent(id *proto.AgentIdentification, stream proto.Orchestrator_RegisterTriggerAgentServer) error {
if id.Token != "token" {
return fmt.Errorf("could not validate token")
}

for {
triggerRequest := <-s.triggerChannel
err := stream.Send(triggerRequest)
if err != nil {
log.Println("could not send trigger request to agent: %w", err)
}

}
}

func (s *GrpcServerMock) SendTriggerResult(ctx context.Context, result *proto.TriggerResponse) (*proto.Empty, error) {
if result.AgentIdentification == nil || result.AgentIdentification.Token != "token" {
return nil, fmt.Errorf("could not validate token")
}

s.lastTriggerResponse = result
return &proto.Empty{}, nil
}

func (s *GrpcServerMock) RegisterPollerAgent(id *proto.AgentIdentification, stream proto.Orchestrator_RegisterPollerAgentServer) error {
if id.Token != "token" {
return fmt.Errorf("could not validate token")
}

for {
pollerRequest := <-s.pollingChannel
err := stream.Send(pollerRequest)
if err != nil {
log.Println("could not send polling request to agent: %w", err)
}
}
}

func (s *GrpcServerMock) SendPolledSpans(ctx context.Context, result *proto.PollingResponse) (*proto.Empty, error) {
if result.AgentIdentification == nil || result.AgentIdentification.Token != "token" {
return nil, fmt.Errorf("could not validate token")
}

s.lastPollingResponse = result
return &proto.Empty{}, nil
}

func (s *GrpcServerMock) RegisterShutdownListener(_ *proto.AgentIdentification, stream proto.Orchestrator_RegisterShutdownListenerServer) error {
for {
shutdownRequest := <-s.terminationChannel
err := stream.Send(shutdownRequest)
if err != nil {
log.Println("could not send polling request to agent: %w", err)
}
}
}

// Test methods

func (s *GrpcServerMock) SendTriggerRequest(request *proto.TriggerRequest) {
s.triggerChannel <- request
}

func (s *GrpcServerMock) SendPollingRequest(request *proto.PollingRequest) {
s.pollingChannel <- request
}

func (s *GrpcServerMock) GetLastTriggerResponse() *proto.TriggerResponse {
return s.lastTriggerResponse
}

func (s *GrpcServerMock) GetLastPollingResponse() *proto.PollingResponse {
return s.lastPollingResponse
}

func (s *GrpcServerMock) TerminateConnection(reason string) {
s.terminationChannel <- &proto.ShutdownRequest{
Reason: reason,
}
}
15 changes: 15 additions & 0 deletions agent/client/options.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package client

type Option func(*Client)

func WithAPIKey(apiKey string) Option {
return func(c *Client) {
c.config.APIKey = apiKey
}
}

func WithAgentName(name string) Option {
return func(c *Client) {
c.config.AgentName = name
}
}
Loading

0 comments on commit df2f7c1

Please sign in to comment.