From a5448d22fb6e896739b2c3689b31b5fad2a76f1c Mon Sep 17 00:00:00 2001
From: Povilas Versockas
Date: Thu, 18 May 2023 16:49:19 +0300
Subject: [PATCH] feat: add clusterid to agent (#3858)
---
cmd/api-server/main.go | 2 +-
pkg/agent/agent.go | 7 +++++++
pkg/agent/agent_test.go | 2 +-
pkg/agent/events_test.go | 2 +-
pkg/agent/logs_test.go | 2 +-
5 files changed, 11 insertions(+), 4 deletions(-)
diff --git a/cmd/api-server/main.go b/cmd/api-server/main.go
index b33871a3d3d..11984d883ec 100644
--- a/cmd/api-server/main.go
+++ b/cmd/api-server/main.go
@@ -394,7 +394,7 @@ func main() {
if mode == common.ModeAgent {
log.DefaultLogger.Info("starting agent service")
- agentHandle, err := agent.NewAgent(log.DefaultLogger, api.Mux.Handler(), cfg.TestkubeCloudAPIKey, grpcClient, cfg.TestkubeCloudWorkerCount, cfg.TestkubeCloudLogStreamWorkerCount, api.GetLogsStream)
+ agentHandle, err := agent.NewAgent(log.DefaultLogger, api.Mux.Handler(), cfg.TestkubeCloudAPIKey, grpcClient, cfg.TestkubeCloudWorkerCount, cfg.TestkubeCloudLogStreamWorkerCount, api.GetLogsStream, clusterId)
if err != nil {
ui.ExitOnError("Starting agent", err)
}
diff --git a/pkg/agent/agent.go b/pkg/agent/agent.go
index 099df5f4ea9..e39ebbc3ef1 100644
--- a/pkg/agent/agent.go
+++ b/pkg/agent/agent.go
@@ -26,6 +26,7 @@ import (
const (
apiKeyMeta = "api-key"
+ clusterIDMeta = "cluster-id"
healthcheckCommand = "healthcheck"
)
@@ -62,6 +63,8 @@ type Agent struct {
sendTimeout time.Duration
receiveTimeout time.Duration
healthcheckInterval time.Duration
+
+ clusterID string
}
func NewAgent(logger *zap.SugaredLogger,
@@ -71,6 +74,7 @@ func NewAgent(logger *zap.SugaredLogger,
workerCount int,
logStreamWorkerCount int,
logStreamFunc func(ctx context.Context, executionID string) (chan output.Output, error),
+ clusterID string,
) (*Agent, error) {
return &Agent{
handler: handler,
@@ -88,6 +92,7 @@ func NewAgent(logger *zap.SugaredLogger,
logStreamRequestBuffer: make(chan *cloud.LogsStreamRequest, bufferSizePerWorker*logStreamWorkerCount),
logStreamResponseBuffer: make(chan *cloud.LogsStreamResponse, bufferSizePerWorker*logStreamWorkerCount),
logStreamFunc: logStreamFunc,
+ clusterID: clusterID,
}, nil
}
@@ -194,6 +199,8 @@ func (ag *Agent) receiveCommand(ctx context.Context, stream cloud.TestKubeCloudA
func (ag *Agent) runCommandLoop(ctx context.Context) error {
ctx = AddAPIKeyMeta(ctx, ag.apiKey)
+ ctx = metadata.AppendToOutgoingContext(ctx, clusterIDMeta, ag.clusterID)
+
ag.logger.Infow("initiating streaming connection with Cloud API")
// creates a new Stream from the client side. ctx is used for the lifetime of the stream.
opts := []grpc.CallOption{grpc.UseCompressor(gzip.Name), grpc.MaxCallRecvMsgSize(math.MaxInt32)}
diff --git a/pkg/agent/agent_test.go b/pkg/agent/agent_test.go
index cdda2428e82..01597a0d8df 100644
--- a/pkg/agent/agent_test.go
+++ b/pkg/agent/agent_test.go
@@ -56,7 +56,7 @@ func TestCommandExecution(t *testing.T) {
var logStreamFunc func(ctx context.Context, executionID string) (chan output.Output, error)
logger, _ := zap.NewDevelopment()
- agent, err := agent.NewAgent(logger.Sugar(), m, "api-key", grpcClient, 5, 5, logStreamFunc)
+ agent, err := agent.NewAgent(logger.Sugar(), m, "api-key", grpcClient, 5, 5, logStreamFunc, "")
if err != nil {
t.Fatal(err)
}
diff --git a/pkg/agent/events_test.go b/pkg/agent/events_test.go
index cc3dcba3b90..d42f7855669 100644
--- a/pkg/agent/events_test.go
+++ b/pkg/agent/events_test.go
@@ -52,7 +52,7 @@ func TestEventLoop(t *testing.T) {
grpcClient := cloud.NewTestKubeCloudAPIClient(grpcConn)
var logStreamFunc func(ctx context.Context, executionID string) (chan output.Output, error)
- agent, err := agent.NewAgent(logger.Sugar(), nil, "api-key", grpcClient, 5, 5, logStreamFunc)
+ agent, err := agent.NewAgent(logger.Sugar(), nil, "api-key", grpcClient, 5, 5, logStreamFunc, "")
assert.NoError(t, err)
go func() {
l, err := agent.Load()
diff --git a/pkg/agent/logs_test.go b/pkg/agent/logs_test.go
index 21e00e6359c..f371d10f131 100644
--- a/pkg/agent/logs_test.go
+++ b/pkg/agent/logs_test.go
@@ -63,7 +63,7 @@ func TestLogStream(t *testing.T) {
}
logger, _ := zap.NewDevelopment()
- agent, err := agent.NewAgent(logger.Sugar(), m, "api-key", grpcClient, 5, 5, logStreamFunc)
+ agent, err := agent.NewAgent(logger.Sugar(), m, "api-key", grpcClient, 5, 5, logStreamFunc, "")
if err != nil {
t.Fatal(err)
}