Skip to content

Commit

Permalink
feat: add clusterid to agent (#3858)
Browse files Browse the repository at this point in the history
  • Loading branch information
povilasv committed May 18, 2023
1 parent 9fa35f6 commit a5448d2
Show file tree
Hide file tree
Showing 5 changed files with 11 additions and 4 deletions.
2 changes: 1 addition & 1 deletion cmd/api-server/main.go
Expand Up @@ -394,7 +394,7 @@ func main() {
if mode == common.ModeAgent {
log.DefaultLogger.Info("starting agent service")

agentHandle, err := agent.NewAgent(log.DefaultLogger, api.Mux.Handler(), cfg.TestkubeCloudAPIKey, grpcClient, cfg.TestkubeCloudWorkerCount, cfg.TestkubeCloudLogStreamWorkerCount, api.GetLogsStream)
agentHandle, err := agent.NewAgent(log.DefaultLogger, api.Mux.Handler(), cfg.TestkubeCloudAPIKey, grpcClient, cfg.TestkubeCloudWorkerCount, cfg.TestkubeCloudLogStreamWorkerCount, api.GetLogsStream, clusterId)
if err != nil {
ui.ExitOnError("Starting agent", err)
}
Expand Down
7 changes: 7 additions & 0 deletions pkg/agent/agent.go
Expand Up @@ -26,6 +26,7 @@ import (

const (
apiKeyMeta = "api-key"
clusterIDMeta = "cluster-id"
healthcheckCommand = "healthcheck"
)

Expand Down Expand Up @@ -62,6 +63,8 @@ type Agent struct {
sendTimeout time.Duration
receiveTimeout time.Duration
healthcheckInterval time.Duration

clusterID string
}

func NewAgent(logger *zap.SugaredLogger,
Expand All @@ -71,6 +74,7 @@ func NewAgent(logger *zap.SugaredLogger,
workerCount int,
logStreamWorkerCount int,
logStreamFunc func(ctx context.Context, executionID string) (chan output.Output, error),
clusterID string,
) (*Agent, error) {
return &Agent{
handler: handler,
Expand All @@ -88,6 +92,7 @@ func NewAgent(logger *zap.SugaredLogger,
logStreamRequestBuffer: make(chan *cloud.LogsStreamRequest, bufferSizePerWorker*logStreamWorkerCount),
logStreamResponseBuffer: make(chan *cloud.LogsStreamResponse, bufferSizePerWorker*logStreamWorkerCount),
logStreamFunc: logStreamFunc,
clusterID: clusterID,
}, nil
}

Expand Down Expand Up @@ -194,6 +199,8 @@ func (ag *Agent) receiveCommand(ctx context.Context, stream cloud.TestKubeCloudA
func (ag *Agent) runCommandLoop(ctx context.Context) error {
ctx = AddAPIKeyMeta(ctx, ag.apiKey)

ctx = metadata.AppendToOutgoingContext(ctx, clusterIDMeta, ag.clusterID)

ag.logger.Infow("initiating streaming connection with Cloud API")
// creates a new Stream from the client side. ctx is used for the lifetime of the stream.
opts := []grpc.CallOption{grpc.UseCompressor(gzip.Name), grpc.MaxCallRecvMsgSize(math.MaxInt32)}
Expand Down
2 changes: 1 addition & 1 deletion pkg/agent/agent_test.go
Expand Up @@ -56,7 +56,7 @@ func TestCommandExecution(t *testing.T) {
var logStreamFunc func(ctx context.Context, executionID string) (chan output.Output, error)

logger, _ := zap.NewDevelopment()
agent, err := agent.NewAgent(logger.Sugar(), m, "api-key", grpcClient, 5, 5, logStreamFunc)
agent, err := agent.NewAgent(logger.Sugar(), m, "api-key", grpcClient, 5, 5, logStreamFunc, "")
if err != nil {
t.Fatal(err)
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/agent/events_test.go
Expand Up @@ -52,7 +52,7 @@ func TestEventLoop(t *testing.T) {
grpcClient := cloud.NewTestKubeCloudAPIClient(grpcConn)

var logStreamFunc func(ctx context.Context, executionID string) (chan output.Output, error)
agent, err := agent.NewAgent(logger.Sugar(), nil, "api-key", grpcClient, 5, 5, logStreamFunc)
agent, err := agent.NewAgent(logger.Sugar(), nil, "api-key", grpcClient, 5, 5, logStreamFunc, "")
assert.NoError(t, err)
go func() {
l, err := agent.Load()
Expand Down
2 changes: 1 addition & 1 deletion pkg/agent/logs_test.go
Expand Up @@ -63,7 +63,7 @@ func TestLogStream(t *testing.T) {
}

logger, _ := zap.NewDevelopment()
agent, err := agent.NewAgent(logger.Sugar(), m, "api-key", grpcClient, 5, 5, logStreamFunc)
agent, err := agent.NewAgent(logger.Sugar(), m, "api-key", grpcClient, 5, 5, logStreamFunc, "")
if err != nil {
t.Fatal(err)
}
Expand Down

0 comments on commit a5448d2

Please sign in to comment.