From 9ac96d93dc82a892edcf3ccdfc3d8a85c816b52e Mon Sep 17 00:00:00 2001 From: James Jeffries Date: Thu, 4 Sep 2025 15:42:31 +0100 Subject: [PATCH 1/8] use correct scopes and zone --- agent/configmgr/fleet.go | 71 +++++++++++++++++++++++------- agent/configmgr/fleet_test.go | 23 +++++----- agent/configmgr/jwt_claims.go | 64 +++++---------------------- agent/configmgr/jwt_claims_test.go | 70 +++++++++++++---------------- 4 files changed, 106 insertions(+), 122 deletions(-) diff --git a/agent/configmgr/fleet.go b/agent/configmgr/fleet.go index be07812..16430a7 100644 --- a/agent/configmgr/fleet.go +++ b/agent/configmgr/fleet.go @@ -15,6 +15,8 @@ import ( "github.com/eclipse/paho.golang/autopaho" "github.com/eclipse/paho.golang/paho" + "github.com/go-jose/go-jose/v4" + "github.com/go-jose/go-jose/v4/jwt" "github.com/netboxlabs/orb-agent/agent/backend" "github.com/netboxlabs/orb-agent/agent/config" @@ -103,17 +105,23 @@ func newFleetConfigManager(ctx context.Context, logger *slog.Logger, pMgr policy } func (fleetManager *fleetConfigManager) Start(cfg config.Config, backends map[string]backend.Backend) error { - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() + ctx, _ := context.WithCancel(context.Background()) + // defer cancel() TODO: I dont think this is correct but we're cancelling the context too early so can't publish. Maybe they need to run under a different context? + fleetManager.logger.Info("starting fleet config manager", "token_url", cfg.OrbAgent.ConfigManager.Sources.Fleet.TokenURL, "client_id", cfg.OrbAgent.ConfigManager.Sources.Fleet.ClientID, "client_secret", cfg.OrbAgent.ConfigManager.Sources.Fleet.ClientSecret) // call the token url to get the token token, err := fleetManager.getToken(ctx, cfg.OrbAgent.ConfigManager.Sources.Fleet.TokenURL, cfg.OrbAgent.ConfigManager.Sources.Fleet.ClientID, cfg.OrbAgent.ConfigManager.Sources.Fleet.ClientSecret) if err != nil { return err } + jwtClaims, err := parseJWTClaims(token.AccessToken) + if err != nil { + return fmt.Errorf("failed to parse JWT claims: %w", err) + } + // generate topics from JWT claims and config agent_id using hardcoded templates - topics, err := generateTopicsFromTemplate(token.AccessToken, cfg.OrbAgent.ConfigManager.Sources.Fleet.AgentID) + topics, err := generateTopicsFromTemplate(token.AccessToken, cfg.OrbAgent.ConfigManager.Sources.Fleet.ClientID, jwtClaims) if err != nil { return fmt.Errorf("failed to generate topics: %w", err) } @@ -131,14 +139,53 @@ func (fleetManager *fleetConfigManager) Start(cfg config.Config, backends map[st } // use the generated topics to connect over MQTT v5 - err = fleetManager.connect(mqttURL, token.AccessToken, *topics, backends, cfg.OrbAgent.ConfigManager.Sources.Fleet.AgentID) + err = fleetManager.connect(ctx, mqttURL, token.AccessToken, *topics, backends, cfg.OrbAgent.ConfigManager.Sources.Fleet.AgentID, jwtClaims.Zone) if err != nil { return err } return nil } -func (fleetManager *fleetConfigManager) connectWithContext(ctx context.Context, fleetMQTTURL, token string, topics tokenResponseTopics, backends map[string]backend.Backend, agentID string) error { +// parseJWTClaims extracts org_id claim from a JWT token +func parseJWTClaims(tokenString string) (*JWTClaims, error) { + if tokenString == "" { + return nil, fmt.Errorf("empty token string") + } + + // Parse the JWT token without verification (since we already trust it from the token endpoint) + // We accept common signature algorithms used in JWTs + token, err := jwt.ParseSigned(tokenString, []jose.SignatureAlgorithm{jose.HS256, jose.HS384, jose.HS512, jose.RS256, jose.RS384, jose.RS512, jose.ES256, jose.ES384, jose.ES512}) + if err != nil { + return nil, fmt.Errorf("failed to parse JWT token: %w", err) + } + + var claims jwt.Claims + var customClaims map[string]interface{} + + // Extract both standard and custom claims without verification + if err := token.UnsafeClaimsWithoutVerification(&claims, &customClaims); err != nil { + return nil, fmt.Errorf("failed to extract claims from JWT: %w", err) + } + + // Extract org_id from custom claims + jwtClaims := &JWTClaims{} + + if orgID, ok := customClaims["orb:org_id"].(string); ok { + jwtClaims.OrgID = orgID + } else { + return nil, fmt.Errorf("orb:org_id claim not found or not a string in JWT token") + } + + if zone, ok := customClaims["orb:zone"].(string); ok { + jwtClaims.Zone = zone + } else { + return nil, fmt.Errorf("orb:zone claim not found or not a string in JWT token") + } + + return jwtClaims, nil +} + +func (fleetManager *fleetConfigManager) connect(ctx context.Context, fleetMQTTURL, token string, topics tokenResponseTopics, backends map[string]backend.Backend, agentID, zone string) error { // Parse the ORB URL serverURL, err := url.Parse(fleetMQTTURL) if err != nil { @@ -147,7 +194,7 @@ func (fleetManager *fleetConfigManager) connectWithContext(ctx context.Context, } // Configure autopaho client - clientID := "orb-agent-" + time.Now().Format("20060102150405") + clientID := agentID + time.Now().Format("20060102150405") cfg := autopaho.ClientConfig{ ServerUrls: []*url.URL{serverURL}, @@ -234,12 +281,8 @@ func (fleetManager *fleetConfigManager) connectWithContext(ctx context.Context, // Set authentication if token is provided if token != "" { - cfg.ConnectUsername = "token" // Using token as username, adjust as needed for your auth scheme + cfg.ConnectUsername = fmt.Sprintf("%s:%s", zone, clientID) cfg.ConnectPassword = []byte(token) - } else { - // TODO: remove these temporary credentials - cfg.ConnectUsername = "admin" - cfg.ConnectPassword = []byte("admin") } // Create and start the connection manager using the long-lived context. @@ -264,12 +307,6 @@ func (fleetManager *fleetConfigManager) connectWithContext(ctx context.Context, return nil } -// connect is a backward-compatibility shim that invokes connectWithContext with -// the fleet manager's root context. -func (fleetManager *fleetConfigManager) connect(fleetMQTTURL, token string, topics tokenResponseTopics, backends map[string]backend.Backend, agentID string) error { - return fleetManager.connectWithContext(fleetManager.heartbeater.heartbeatCtx, fleetMQTTURL, token, topics, backends, agentID) -} - func (fleetManager *fleetConfigManager) sendCapabilities(ctx context.Context, backends map[string]backend.Backend, publishFunc func(ctx context.Context, payload []byte) error) { capabilities := messages.Capabilities{ SchemaVersion: messages.CurrentCapabilitiesSchemaVersion, diff --git a/agent/configmgr/fleet_test.go b/agent/configmgr/fleet_test.go index c312c4f..76434f0 100644 --- a/agent/configmgr/fleet_test.go +++ b/agent/configmgr/fleet_test.go @@ -544,7 +544,7 @@ func TestFleetConfigManager_Connect_InvalidURL(t *testing.T) { // Act with invalid URL backends := make(map[string]backend.Backend) trt := tokenResponseTopics{Inbox: "test/topic"} - err := fleetManager.connect("://invalid-url", "test_token", trt, backends, "test-agent-id") + err := fleetManager.connect(context.Background(), "://invalid-url", "test_token", trt, backends, "test-agent-id", "test-zone") // Assert assert.Error(t, err) @@ -562,7 +562,7 @@ func TestFleetConfigManager_Connect_ValidURL(t *testing.T) { // since we don't have a real MQTT server backends := make(map[string]backend.Backend) trt2 := tokenResponseTopics{Inbox: "test/topic"} - err := fleetManager.connect("mqtt://localhost:1883", "test_token", trt2, backends, "test-agent-id") + err := fleetManager.connect(context.Background(), "mqtt://localhost:1883", "test_token", trt2, backends, "test-agent-id", "test-zone") // Assert - we expect connection to fail since no server is running, // but URL parsing should succeed @@ -618,9 +618,8 @@ func TestFleetConfigManager_Start_ConnectError(t *testing.T) { server := httptest.NewTLSServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) { response := tokenResponse{ AccessToken: rawJWTWithClaims(map[string]any{ - "org_id": "test-org", - "agent_id": "test-agent-123", - "iat": 1516239022, + "orb:org_id": "test-org", + "iat": 1516239022, }), MQTTURL: "://invalid-mqtt-url", // Invalid MQTT URL ExpiresIn: 3600, @@ -750,9 +749,8 @@ func TestFleetConfigManager_Integration_SuccessFlow(t *testing.T) { server := httptest.NewTLSServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) { response := tokenResponse{ AccessToken: rawJWTWithClaims(map[string]any{ - "org_id": "integration-org", - "agent_id": "test-agent-123", - "iat": 1516239022, + "orb:org_id": "integration-org", + "iat": 1516239022, }), MQTTURL: "mqtt://localhost:1883", // Valid but non-existent ExpiresIn: 3600, @@ -768,15 +766,14 @@ func TestFleetConfigManager_Integration_SuccessFlow(t *testing.T) { // Assert - token retrieval should succeed require.NoError(t, err) expectedJWT := rawJWTWithClaims(map[string]any{ - "org_id": "integration-org", - "agent_id": "test-agent-123", - "iat": 1516239022, + "orb:org_id": "integration-org", + "iat": 1516239022, }) assert.Equal(t, expectedJWT, token.AccessToken) assert.Equal(t, "mqtt://localhost:1883", token.MQTTURL) // Test that topic generation works with the JWT - topics, err := generateTopicsFromTemplate(token.AccessToken, "test-agent-123") + topics, err := generateTopicsFromTemplate(token.AccessToken, "test-agent-123", &JWTClaims{OrgID: "integration-org"}) require.NoError(t, err) assert.Equal(t, "/orgs/integration-org/agents/test-agent-123/inbox", topics.Inbox) assert.Equal(t, "/orgs/integration-org/agents/test-agent-123/outbox", topics.Outbox) @@ -1338,7 +1335,7 @@ func TestGenerateTopicsFromTemplate_Integration(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - topics, err := generateTopicsFromTemplate(tt.jwt, tt.expectedAgent) + topics, err := generateTopicsFromTemplate(tt.jwt, tt.expectedAgent, &JWTClaims{OrgID: tt.expectedOrg}) require.NoError(t, err) expectedTopics := &tokenResponseTopics{ diff --git a/agent/configmgr/jwt_claims.go b/agent/configmgr/jwt_claims.go index 5a0a2b0..011d7f7 100644 --- a/agent/configmgr/jwt_claims.go +++ b/agent/configmgr/jwt_claims.go @@ -1,22 +1,19 @@ package configmgr import ( - "fmt" "strings" - - "github.com/go-jose/go-jose/v4" - "github.com/go-jose/go-jose/v4/jwt" ) // JWTClaims represents the JWT claims we extract for topic templating type JWTClaims struct { - OrgID string `json:"org_id"` + OrgID string `json:"orb:org_id"` + Zone string `json:"orb:zone"` } // TopicClaims combines org_id from JWT with agent_id from config type TopicClaims struct { - OrgID string - AgentID string + OrgID string + ClientId string } // TopicTemplates defines hardcoded topic name patterns with placeholders @@ -30,65 +27,28 @@ type TopicTemplates struct { // DefaultTopicTemplates returns the hardcoded topic templates func DefaultTopicTemplates() TopicTemplates { return TopicTemplates{ - Heartbeat: "/orgs/{org_id}/agents/{agent_id}/heartbeat", - Capabilities: "/orgs/{org_id}/agents/{agent_id}/capabilities", - Inbox: "/orgs/{org_id}/agents/{agent_id}/inbox", - Outbox: "/orgs/{org_id}/agents/{agent_id}/outbox", - } -} - -// parseJWTClaims extracts org_id claim from a JWT token -func parseJWTClaims(tokenString string) (*JWTClaims, error) { - if tokenString == "" { - return nil, fmt.Errorf("empty token string") - } - - // Parse the JWT token without verification (since we already trust it from the token endpoint) - // We accept common signature algorithms used in JWTs - token, err := jwt.ParseSigned(tokenString, []jose.SignatureAlgorithm{jose.HS256, jose.HS384, jose.HS512, jose.RS256, jose.RS384, jose.RS512, jose.ES256, jose.ES384, jose.ES512}) - if err != nil { - return nil, fmt.Errorf("failed to parse JWT token: %w", err) - } - - var claims jwt.Claims - var customClaims map[string]interface{} - - // Extract both standard and custom claims without verification - if err := token.UnsafeClaimsWithoutVerification(&claims, &customClaims); err != nil { - return nil, fmt.Errorf("failed to extract claims from JWT: %w", err) + Heartbeat: "orgs/{org_id}/agents/{client_id}/heartbeat", + Capabilities: "orgs/{org_id}/agents/{client_id}/capabilities", + Inbox: "orgs/{org_id}/agents/{client_id}/inbox", + Outbox: "orgs/{org_id}/agents/{client_id}/outbox", } - - // Extract org_id from custom claims - jwtClaims := &JWTClaims{} - - if orgID, ok := customClaims["org_id"].(string); ok { - jwtClaims.OrgID = orgID - } else { - return nil, fmt.Errorf("org_id claim not found or not a string in JWT token") - } - - return jwtClaims, nil } // fillTopicTemplate replaces placeholders in a topic template with actual values func fillTopicTemplate(template string, claims *TopicClaims) string { result := template result = strings.ReplaceAll(result, "{org_id}", claims.OrgID) - result = strings.ReplaceAll(result, "{agent_id}", claims.AgentID) + result = strings.ReplaceAll(result, "{client_id}", claims.ClientId) return result } // generateTopicsFromTemplate creates actual topic names from templates using JWT claims and config agent_id -func generateTopicsFromTemplate(tokenString string, agentID string) (*tokenResponseTopics, error) { - jwtClaims, err := parseJWTClaims(tokenString) - if err != nil { - return nil, fmt.Errorf("failed to parse JWT claims: %w", err) - } +func generateTopicsFromTemplate(tokenString string, clientId string, jwtClaims *JWTClaims) (*tokenResponseTopics, error) { // Combine JWT org_id with config agent_id topicClaims := &TopicClaims{ - OrgID: jwtClaims.OrgID, - AgentID: agentID, + OrgID: jwtClaims.OrgID, + ClientId: clientId, } templates := DefaultTopicTemplates() diff --git a/agent/configmgr/jwt_claims_test.go b/agent/configmgr/jwt_claims_test.go index 463c5fc..6d59fd0 100644 --- a/agent/configmgr/jwt_claims_test.go +++ b/agent/configmgr/jwt_claims_test.go @@ -88,29 +88,30 @@ func TestParseJWTClaims(t *testing.T) { }, { name: "valid JWT with org_id", - // This is a sample JWT with the claims we need (created using jwt.io) - // Header: {"alg":"HS256","typ":"JWT"} - // Payload: {"org_id":"test-org","agent_id":"test-agent-123","iat":1516239022} - // Secret: "your-256-bit-secret" - tokenString: "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJvcmdfaWQiOiJ0ZXN0LW9yZyIsImFnZW50X2lkIjoidGVzdC1hZ2VudC0xMjMiLCJpYXQiOjE1MTYyMzkwMjJ9.8U8W8j_2ZwV2GvO3QcI6LJl2a8XGrHPDYS9hM2y4k2I", + tokenString: rawJWTWithClaims(map[string]any{ + "org_id": "test-org", + "agent_id": "test-agent-123", + "iat": 1516239022, + }), expected: &JWTClaims{ OrgID: "test-org", }, }, { name: "JWT missing org_id", - // Header: {"alg":"HS256","typ":"JWT"} - // Payload: {"agent_id":"test-agent-123","iat":1516239022} - // Secret: "your-256-bit-secret" - tokenString: "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJhZ2VudF9pZCI6InRlc3QtYWdlbnQtMTIzIiwiaWF0IjoxNTE2MjM5MDIyfQ.QZ3K8j9QqgV2HvP4RdJ7MKm3b9YHsIPEZT0iN3z5l3J", + tokenString: rawJWTWithClaims(map[string]any{ + "agent_id": "test-agent-123", + "iat": 1516239022, + }), expectedErr: "org_id claim not found", }, { name: "JWT with non-string org_id", - // Header: {"alg":"HS256","typ":"JWT"} - // Payload: {"org_id":123,"agent_id":"test-agent-123","iat":1516239022} - // Secret: "your-256-bit-secret" - tokenString: "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJvcmdfaWQiOjEyMywiYWdlbnRfaWQiOiJ0ZXN0LWFnZW50LTEyMyIsImlhdCI6MTUxNjIzOTAyMn0.Vh9X8l2LwO6PvR5QfK9NMn4c0ZIsJGF1aU3oO4z6m5K", + tokenString: rawJWTWithClaims(map[string]any{ + "org_id": 123, + "agent_id": "test-agent-123", + "iat": 1516239022, + }), expectedErr: "org_id claim not found or not a string", }, } @@ -135,23 +136,17 @@ func TestGenerateTopicsFromTemplate(t *testing.T) { tests := []struct { name string tokenString string - expectedErr string + orgID string expected *tokenResponseTopics }{ - { - name: "empty token", - tokenString: "", - expectedErr: "failed to parse JWT claims", - }, - { - name: "invalid token", - tokenString: "invalid.token", - expectedErr: "failed to parse JWT claims", - }, { name: "valid token generates correct topics", - // Valid JWT with org_id="test-org" and agent_id="test-agent-123" - tokenString: "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJvcmdfaWQiOiJ0ZXN0LW9yZyIsImFnZW50X2lkIjoidGVzdC1hZ2VudC0xMjMiLCJpYXQiOjE1MTYyMzkwMjJ9.8U8W8j_2ZwV2GvO3QcI6LJl2a8XGrHPDYS9hM2y4k2I", + tokenString: rawJWTWithClaims(map[string]any{ + "org_id": "test-org", + "agent_id": "test-agent-123", + "iat": 1516239022, + }), + orgID: "test-org", expected: &tokenResponseTopics{ Heartbeat: "/orgs/test-org/agents/test-agent-123/heartbeat", Capabilities: "/orgs/test-org/agents/test-agent-123/capabilities", @@ -161,11 +156,12 @@ func TestGenerateTopicsFromTemplate(t *testing.T) { }, { name: "different org and agent values", - // JWT with org_id="prod-company" and agent_id="agent-456" - // Header: {"alg":"HS256","typ":"JWT"} - // Payload: {"org_id":"prod-company","agent_id":"agent-456","iat":1516239022} - // Secret: "your-256-bit-secret" - tokenString: "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJvcmdfaWQiOiJwcm9kLWNvbXBhbnkiLCJhZ2VudF9pZCI6ImFnZW50LTQ1NiIsImlhdCI6MTUxNjIzOTAyMn0.KJ2NtX5zY8R9LwP3QfM6NOn5d1aJsHGE2bV4oQ6z7mL", + tokenString: rawJWTWithClaims(map[string]any{ + "org_id": "prod-company", + "agent_id": "agent-456", + "iat": 1516239022, + }), + orgID: "prod-company", expected: &tokenResponseTopics{ Heartbeat: "/orgs/prod-company/agents/test-agent-123/heartbeat", Capabilities: "/orgs/prod-company/agents/test-agent-123/capabilities", @@ -177,16 +173,10 @@ func TestGenerateTopicsFromTemplate(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - topics, err := generateTopicsFromTemplate(tt.tokenString, "test-agent-123") + topics, err := generateTopicsFromTemplate(tt.tokenString, "test-agent-123", &JWTClaims{OrgID: tt.orgID}) - if tt.expectedErr != "" { - require.Error(t, err) - assert.Contains(t, err.Error(), tt.expectedErr) - assert.Nil(t, topics) - } else { - require.NoError(t, err) - assert.Equal(t, tt.expected, topics) - } + require.NoError(t, err) + assert.Equal(t, tt.expected, topics) }) } } From caebacd566fd7f1ec6a5b148e002c3f50098b1b1 Mon Sep 17 00:00:00 2001 From: Leonardo Parente <23251360+leoparente@users.noreply.github.com> Date: Thu, 4 Sep 2025 14:11:37 -0300 Subject: [PATCH 2/8] small fixes --- agent/configmgr/fleet.go | 18 ++++++++++-------- agent/configmgr/jwt_claims.go | 5 ++--- agent/configmgr/jwt_claims_test.go | 16 ++++++++-------- 3 files changed, 20 insertions(+), 19 deletions(-) diff --git a/agent/configmgr/fleet.go b/agent/configmgr/fleet.go index 16430a7..0c13f20 100644 --- a/agent/configmgr/fleet.go +++ b/agent/configmgr/fleet.go @@ -137,9 +137,12 @@ func (fleetManager *fleetConfigManager) Start(cfg config.Config, backends map[st if mqttURL == "" { mqttURL = cfg.OrbAgent.ConfigManager.Sources.Fleet.MQTTURL } + if mqttURL == "" { + return fmt.Errorf("no MQTT URL provided in token response or config") + } // use the generated topics to connect over MQTT v5 - err = fleetManager.connect(ctx, mqttURL, token.AccessToken, *topics, backends, cfg.OrbAgent.ConfigManager.Sources.Fleet.AgentID, jwtClaims.Zone) + err = fleetManager.connect(ctx, mqttURL, token.AccessToken, *topics, backends, cfg.OrbAgent.ConfigManager.Sources.Fleet.ClientID, jwtClaims.Zone) if err != nil { return err } @@ -160,7 +163,7 @@ func parseJWTClaims(tokenString string) (*JWTClaims, error) { } var claims jwt.Claims - var customClaims map[string]interface{} + var customClaims map[string]any // Extract both standard and custom claims without verification if err := token.UnsafeClaimsWithoutVerification(&claims, &customClaims); err != nil { @@ -185,7 +188,7 @@ func parseJWTClaims(tokenString string) (*JWTClaims, error) { return jwtClaims, nil } -func (fleetManager *fleetConfigManager) connect(ctx context.Context, fleetMQTTURL, token string, topics tokenResponseTopics, backends map[string]backend.Backend, agentID, zone string) error { +func (fleetManager *fleetConfigManager) connect(ctx context.Context, fleetMQTTURL, token string, topics tokenResponseTopics, backends map[string]backend.Backend, clientID, zone string) error { // Parse the ORB URL serverURL, err := url.Parse(fleetMQTTURL) if err != nil { @@ -193,9 +196,6 @@ func (fleetManager *fleetConfigManager) connect(ctx context.Context, fleetMQTTUR return err } - // Configure autopaho client - clientID := agentID + time.Now().Format("20060102150405") - cfg := autopaho.ClientConfig{ ServerUrls: []*url.URL{serverURL}, KeepAlive: 30, @@ -224,7 +224,7 @@ func (fleetManager *fleetConfigManager) connect(ctx context.Context, fleetMQTTUR publishResponse, err := cm.Publish(ctx, &paho.Publish{ Topic: topics.Heartbeat, Payload: payload, - QoS: 1, + QoS: 0, Retain: false, }) if err != nil { @@ -241,7 +241,7 @@ func (fleetManager *fleetConfigManager) connect(ctx context.Context, fleetMQTTUR "payload", string(payload), ) return nil - }, agentID) + }, clientID) go fleetManager.sendCapabilities(ctx, backends, func(ctx context.Context, payload []byte) error { _, err := cm.Publish(ctx, &paho.Publish{ @@ -281,6 +281,7 @@ func (fleetManager *fleetConfigManager) connect(ctx context.Context, fleetMQTTUR // Set authentication if token is provided if token != "" { + fleetManager.logger.Info("setting MQTT authentication", "client_id", clientID, "zone", zone) cfg.ConnectUsername = fmt.Sprintf("%s:%s", zone, clientID) cfg.ConnectPassword = []byte(token) } @@ -393,6 +394,7 @@ func (fleetManager *fleetConfigManager) getToken(ctx context.Context, tokenURL s data.Set("scope", strings.Join(scopes, " ")) data.Set("client_id", clientID) data.Set("client_secret", clientSecret) + data.Set("audience", "orb") fleetManager.logger.Debug("sending token request", "url", tokenURL, "data", data, "client_id", clientID) //, "client_secret", clientSecret) diff --git a/agent/configmgr/jwt_claims.go b/agent/configmgr/jwt_claims.go index 011d7f7..aca442b 100644 --- a/agent/configmgr/jwt_claims.go +++ b/agent/configmgr/jwt_claims.go @@ -43,12 +43,11 @@ func fillTopicTemplate(template string, claims *TopicClaims) string { } // generateTopicsFromTemplate creates actual topic names from templates using JWT claims and config agent_id -func generateTopicsFromTemplate(tokenString string, clientId string, jwtClaims *JWTClaims) (*tokenResponseTopics, error) { - +func generateTopicsFromTemplate(tokenString string, clientID string, jwtClaims *JWTClaims) (*tokenResponseTopics, error) { // Combine JWT org_id with config agent_id topicClaims := &TopicClaims{ OrgID: jwtClaims.OrgID, - ClientId: clientId, + ClientId: clientID, } templates := DefaultTopicTemplates() diff --git a/agent/configmgr/jwt_claims_test.go b/agent/configmgr/jwt_claims_test.go index 6d59fd0..bceb50c 100644 --- a/agent/configmgr/jwt_claims_test.go +++ b/agent/configmgr/jwt_claims_test.go @@ -27,8 +27,8 @@ func TestFillTopicTemplate(t *testing.T) { name: "basic substitution", template: "/orgs/{org_id}/agents/{agent_id}/test", claims: &TopicClaims{ - OrgID: "org123", - AgentID: "agent-456", + OrgID: "org123", + ClientId: "agent-456", }, expected: "/orgs/org123/agents/agent-456/test", }, @@ -36,8 +36,8 @@ func TestFillTopicTemplate(t *testing.T) { name: "multiple occurrences", template: "{org_id}/data/{org_id}/{agent_id}", claims: &TopicClaims{ - OrgID: "company1", - AgentID: "agent-789", + OrgID: "company1", + ClientId: "agent-789", }, expected: "company1/data/company1/agent-789", }, @@ -45,8 +45,8 @@ func TestFillTopicTemplate(t *testing.T) { name: "no placeholders", template: "static/topic/name", claims: &TopicClaims{ - OrgID: "org123", - AgentID: "agent-456", + OrgID: "org123", + ClientId: "agent-456", }, expected: "static/topic/name", }, @@ -54,8 +54,8 @@ func TestFillTopicTemplate(t *testing.T) { name: "empty claims", template: "/orgs/{org_id}/agents/{agent_id}/test", claims: &TopicClaims{ - OrgID: "", - AgentID: "", + OrgID: "", + ClientId: "", }, expected: "/orgs//agents//test", }, From 039952401d21bb8829b47e1cc36f178a6283e10b Mon Sep 17 00:00:00 2001 From: Leonardo Parente <23251360+leoparente@users.noreply.github.com> Date: Tue, 9 Sep 2025 15:08:06 -0300 Subject: [PATCH 3/8] fixes --- agent/configmgr/fleet.go | 41 +++++++----- agent/configmgr/fleet_test.go | 73 ++++++++++++---------- agent/configmgr/jwt_claims.go | 12 ++-- agent/configmgr/jwt_claims_test.go | 4 +- agent/configmgr/messages/fleet_messages.go | 49 +++++++++++++-- 5 files changed, 119 insertions(+), 60 deletions(-) diff --git a/agent/configmgr/fleet.go b/agent/configmgr/fleet.go index 0c13f20..bda9711 100644 --- a/agent/configmgr/fleet.go +++ b/agent/configmgr/fleet.go @@ -47,8 +47,9 @@ type heartbeater struct { func (hb *heartbeater) sendSingleHeartbeat(ctx context.Context, publishFunc func(ctx context.Context, payload []byte) error, agentID string, _ time.Time, _ messages.HeartbeatState) { hbData := messages.Heartbeat{ - AgentID: agentID, - Version: "1.0.0", + SchemaVersion: messages.CurrentHeartbeatSchemaVersion, + TimeStamp: time.Now().UTC(), + State: 1, } body, err := json.Marshal(hbData) @@ -121,7 +122,7 @@ func (fleetManager *fleetConfigManager) Start(cfg config.Config, backends map[st } // generate topics from JWT claims and config agent_id using hardcoded templates - topics, err := generateTopicsFromTemplate(token.AccessToken, cfg.OrbAgent.ConfigManager.Sources.Fleet.ClientID, jwtClaims) + topics, err := generateTopicsFromTemplate(token.AccessToken, jwtClaims) if err != nil { return fmt.Errorf("failed to generate topics: %w", err) } @@ -133,7 +134,7 @@ func (fleetManager *fleetConfigManager) Start(cfg config.Config, backends map[st "outbox_topic", topics.Outbox) // use MQTT URL from token response or fallback to config - mqttURL := token.MQTTURL + mqttURL := jwtClaims.MqttURL if mqttURL == "" { mqttURL = cfg.OrbAgent.ConfigManager.Sources.Fleet.MQTTURL } @@ -173,6 +174,11 @@ func parseJWTClaims(tokenString string) (*JWTClaims, error) { // Extract org_id from custom claims jwtClaims := &JWTClaims{} + if agentID, ok := customClaims["orb:agent_id"].(string); ok { + jwtClaims.AgentID = agentID + } else { + return nil, fmt.Errorf("orb:agent_id claim not found or not a string in JWT token") + } if orgID, ok := customClaims["orb:org_id"].(string); ok { jwtClaims.OrgID = orgID } else { @@ -184,6 +190,11 @@ func parseJWTClaims(tokenString string) (*JWTClaims, error) { } else { return nil, fmt.Errorf("orb:zone claim not found or not a string in JWT token") } + if ext, ok := customClaims["ext"].(map[string]any); ok { + if mqttURL, ok := ext["orb:mqtt_url"].(string); ok { + jwtClaims.MqttURL = mqttURL + } + } return jwtClaims, nil } @@ -207,17 +218,17 @@ func (fleetManager *fleetConfigManager) connect(ctx context.Context, fleetMQTTUR OnConnectionUp: func(cm *autopaho.ConnectionManager, _ *paho.Connack) { fleetManager.logger.Info("MQTT connection established", "server", serverURL.String()) - // Subscribe to "mytopic" when connection is established - // _, err := cm.Subscribe(context.Background(), &paho.Subscribe{ - // Subscriptions: []paho.SubscribeOptions{ - // {Topic: topics.Inbox, QoS: 1}, - // }, - // }) - // if err != nil { - // fleetManager.logger.Error("failed to subscribe", "topic", topics.Inbox, "error", err) - // } else { - // fleetManager.logger.Info("successfully subscribed", "topic", topics.Inbox) - // } + //Subscribe to "mytopic" when connection is established + _, err := cm.Subscribe(context.Background(), &paho.Subscribe{ + Subscriptions: []paho.SubscribeOptions{ + {Topic: topics.Inbox, QoS: 1}, + }, + }) + if err != nil { + fleetManager.logger.Error("failed to subscribe", "topic", topics.Inbox, "error", err) + } else { + fleetManager.logger.Info("successfully subscribed", "topic", topics.Inbox) + } // start heartbeat loop bound to the same connection-level context fleetManager.heartbeater.sendHeartbeats(ctx, func() {}, func(ctx context.Context, payload []byte) error { diff --git a/agent/configmgr/fleet_test.go b/agent/configmgr/fleet_test.go index 76434f0..75076a8 100644 --- a/agent/configmgr/fleet_test.go +++ b/agent/configmgr/fleet_test.go @@ -105,21 +105,23 @@ func TestHeartbeater_SendSingleHeartbeat_Success(t *testing.T) { ctx := context.Background() testTime := time.Now() - // Expected heartbeat data - expectedHeartbeat := messages.Heartbeat{ - AgentID: "test-agent-id", - Version: "1.0.0", - } - expectedPayload, _ := json.Marshal(expectedHeartbeat) - - // Set up mock expectations - mockPublish.On("Publish", ctx, expectedPayload).Return(nil) + // We don't assert exact bytes; validate the marshalled heartbeat content + mockPublish.On("Publish", ctx, mock.AnythingOfType("[]uint8")).Return(nil) // Act hb.sendSingleHeartbeat(ctx, mockPublish.Publish, "test-agent-id", testTime, messages.Online) - // Assert - mockPublish.AssertExpectations(t) + // Assert: ensure one publish happened with a valid heartbeat payload + calls := mockPublish.Calls + require.Len(t, calls, 1) + payload, ok := calls[0].Arguments.Get(1).([]byte) + require.True(t, ok) + + var hbMsg messages.Heartbeat + require.NoError(t, json.Unmarshal(payload, &hbMsg)) + assert.Equal(t, messages.CurrentHeartbeatSchemaVersion, hbMsg.SchemaVersion) + assert.Equal(t, messages.State(1), hbMsg.State) + assert.False(t, hbMsg.TimeStamp.IsZero()) } func TestHeartbeater_SendSingleHeartbeat_PublishError(t *testing.T) { @@ -166,8 +168,9 @@ func TestHeartbeater_SendSingleHeartbeat_HeartbeatContent(t *testing.T) { err := json.Unmarshal(capturedPayload, &heartbeat) require.NoError(t, err) - assert.Equal(t, "test-agent-id", heartbeat.AgentID) - assert.Equal(t, "1.0.0", heartbeat.Version) + assert.Equal(t, messages.CurrentHeartbeatSchemaVersion, heartbeat.SchemaVersion) + assert.Equal(t, messages.State(1), heartbeat.State) + assert.False(t, heartbeat.TimeStamp.IsZero()) } func TestHeartbeater_SendHeartbeats_InitialHeartbeat(t *testing.T) { @@ -379,13 +382,15 @@ func TestHeartbeater_SendHeartbeats_HeartbeatStates(t *testing.T) { assert.GreaterOrEqual(t, len(payloadsCopy), 2, "Should have at least initial and final heartbeats") - // Verify all payloads are valid heartbeat messages + // Verify all payloads are valid heartbeat messages and contain expected fields for i, payload := range payloadsCopy { var heartbeat messages.Heartbeat err := json.Unmarshal(payload, &heartbeat) require.NoError(t, err, "Heartbeat %d should be valid JSON", i) - assert.Equal(t, "test-agent-id", heartbeat.AgentID) - assert.Equal(t, "1.0.0", heartbeat.Version) + assert.Equal(t, messages.CurrentHeartbeatSchemaVersion, heartbeat.SchemaVersion) + assert.False(t, heartbeat.TimeStamp.IsZero()) + // Current implementation always sends Online state (1) + assert.Equal(t, messages.State(1), heartbeat.State) } } @@ -773,10 +778,10 @@ func TestFleetConfigManager_Integration_SuccessFlow(t *testing.T) { assert.Equal(t, "mqtt://localhost:1883", token.MQTTURL) // Test that topic generation works with the JWT - topics, err := generateTopicsFromTemplate(token.AccessToken, "test-agent-123", &JWTClaims{OrgID: "integration-org"}) + topics, err := generateTopicsFromTemplate(token.AccessToken, &JWTClaims{AgentID: "test-agent-123", OrgID: "integration-org"}) require.NoError(t, err) - assert.Equal(t, "/orgs/integration-org/agents/test-agent-123/inbox", topics.Inbox) - assert.Equal(t, "/orgs/integration-org/agents/test-agent-123/outbox", topics.Outbox) + assert.Equal(t, "orgs/integration-org/agents/test-agent-123/inbox", topics.Inbox) + assert.Equal(t, "orgs/integration-org/agents/test-agent-123/outbox", topics.Outbox) // Note: We don't test the full Start() method here because it would require // a real MQTT broker, but we've verified the token retrieval and topic generation works @@ -1239,11 +1244,11 @@ func TestFleetConfigManager_SendCapabilities_CapabilitiesStructure(t *testing.T) func TestFleetConfigManager_Start_WithJWTTopicGeneration(t *testing.T) { // This test verifies that the Start method correctly generates topics from JWT claims - // Create a valid JWT token with org_id and agent_id claims + // Create a valid JWT token with orb-prefixed claims used by parseJWTClaims validJWT := rawJWTWithClaims(map[string]any{ - "org_id": "integration-org", - "agent_id": "test-agent-123", - "iat": 1516239022, + "orb:org_id": "integration-org", + "orb:zone": "default", + "iat": 1516239022, }) logger := slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelDebug})) @@ -1314,9 +1319,9 @@ func TestGenerateTopicsFromTemplate_Integration(t *testing.T) { { name: "production-like JWT", jwt: rawJWTWithClaims(map[string]any{ - "org_id": "acme-corp", - "agent_id": "agent-prod-456", - "iat": 1516239022, + "orb:org_id": "acme-corp", + "orb:zone": "z1", + "iat": 1516239022, }), expectedOrg: "acme-corp", expectedAgent: "agent-prod-456", @@ -1324,9 +1329,9 @@ func TestGenerateTopicsFromTemplate_Integration(t *testing.T) { { name: "development JWT with different values", jwt: rawJWTWithClaims(map[string]any{ - "org_id": "dev-123", - "agent_id": "dev-agent-789", - "iat": 1516239022, + "orb:org_id": "dev-123", + "orb:zone": "z2", + "iat": 1516239022, }), expectedOrg: "dev-123", expectedAgent: "dev-agent-789", @@ -1335,14 +1340,14 @@ func TestGenerateTopicsFromTemplate_Integration(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - topics, err := generateTopicsFromTemplate(tt.jwt, tt.expectedAgent, &JWTClaims{OrgID: tt.expectedOrg}) + topics, err := generateTopicsFromTemplate(tt.jwt, &JWTClaims{AgentID: tt.expectedAgent, OrgID: tt.expectedOrg}) require.NoError(t, err) expectedTopics := &tokenResponseTopics{ - Heartbeat: fmt.Sprintf("/orgs/%s/agents/%s/heartbeat", tt.expectedOrg, tt.expectedAgent), - Capabilities: fmt.Sprintf("/orgs/%s/agents/%s/capabilities", tt.expectedOrg, tt.expectedAgent), - Inbox: fmt.Sprintf("/orgs/%s/agents/%s/inbox", tt.expectedOrg, tt.expectedAgent), - Outbox: fmt.Sprintf("/orgs/%s/agents/%s/outbox", tt.expectedOrg, tt.expectedAgent), + Heartbeat: fmt.Sprintf("orgs/%s/agents/%s/heartbeats", tt.expectedOrg, tt.expectedAgent), + Capabilities: fmt.Sprintf("orgs/%s/agents/%s/capabilities", tt.expectedOrg, tt.expectedAgent), + Inbox: fmt.Sprintf("orgs/%s/agents/%s/inbox", tt.expectedOrg, tt.expectedAgent), + Outbox: fmt.Sprintf("orgs/%s/agents/%s/outbox", tt.expectedOrg, tt.expectedAgent), } assert.Equal(t, expectedTopics, topics) diff --git a/agent/configmgr/jwt_claims.go b/agent/configmgr/jwt_claims.go index aca442b..25f81e7 100644 --- a/agent/configmgr/jwt_claims.go +++ b/agent/configmgr/jwt_claims.go @@ -6,8 +6,10 @@ import ( // JWTClaims represents the JWT claims we extract for topic templating type JWTClaims struct { - OrgID string `json:"orb:org_id"` - Zone string `json:"orb:zone"` + AgentID string `json:"agent_id"` + OrgID string `json:"orb:org_id"` + Zone string `json:"orb:zone"` + MqttURL string `json:"orb:mqtt_url"` } // TopicClaims combines org_id from JWT with agent_id from config @@ -27,7 +29,7 @@ type TopicTemplates struct { // DefaultTopicTemplates returns the hardcoded topic templates func DefaultTopicTemplates() TopicTemplates { return TopicTemplates{ - Heartbeat: "orgs/{org_id}/agents/{client_id}/heartbeat", + Heartbeat: "orgs/{org_id}/agents/{client_id}/heartbeats", Capabilities: "orgs/{org_id}/agents/{client_id}/capabilities", Inbox: "orgs/{org_id}/agents/{client_id}/inbox", Outbox: "orgs/{org_id}/agents/{client_id}/outbox", @@ -43,11 +45,11 @@ func fillTopicTemplate(template string, claims *TopicClaims) string { } // generateTopicsFromTemplate creates actual topic names from templates using JWT claims and config agent_id -func generateTopicsFromTemplate(tokenString string, clientID string, jwtClaims *JWTClaims) (*tokenResponseTopics, error) { +func generateTopicsFromTemplate(tokenString string, jwtClaims *JWTClaims) (*tokenResponseTopics, error) { // Combine JWT org_id with config agent_id topicClaims := &TopicClaims{ OrgID: jwtClaims.OrgID, - ClientId: clientID, + ClientId: jwtClaims.AgentID, } templates := DefaultTopicTemplates() diff --git a/agent/configmgr/jwt_claims_test.go b/agent/configmgr/jwt_claims_test.go index bceb50c..fc24721 100644 --- a/agent/configmgr/jwt_claims_test.go +++ b/agent/configmgr/jwt_claims_test.go @@ -10,7 +10,7 @@ import ( func TestDefaultTopicTemplates(t *testing.T) { templates := DefaultTopicTemplates() - assert.Equal(t, "/orgs/{org_id}/agents/{agent_id}/heartbeat", templates.Heartbeat) + assert.Equal(t, "/orgs/{org_id}/agents/{agent_id}/heartbeats", templates.Heartbeat) assert.Equal(t, "/orgs/{org_id}/agents/{agent_id}/capabilities", templates.Capabilities) assert.Equal(t, "/orgs/{org_id}/agents/{agent_id}/inbox", templates.Inbox) assert.Equal(t, "/orgs/{org_id}/agents/{agent_id}/outbox", templates.Outbox) @@ -173,7 +173,7 @@ func TestGenerateTopicsFromTemplate(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - topics, err := generateTopicsFromTemplate(tt.tokenString, "test-agent-123", &JWTClaims{OrgID: tt.orgID}) + topics, err := generateTopicsFromTemplate(tt.tokenString, &JWTClaims{AgentID: "test-agent-123", OrgID: tt.orgID}) require.NoError(t, err) assert.Equal(t, tt.expected, topics) diff --git a/agent/configmgr/messages/fleet_messages.go b/agent/configmgr/messages/fleet_messages.go index 992b993..b871273 100644 --- a/agent/configmgr/messages/fleet_messages.go +++ b/agent/configmgr/messages/fleet_messages.go @@ -1,9 +1,50 @@ package messages +import "time" + // Heartbeat represents a periodic message sent by an agent to indicate it's alive and active +// CurrentHeartbeatSchemaVersion defines the current version of the heartbeat schema +const CurrentHeartbeatSchemaVersion = "1.0" + +// State represents the current state of an agent in the system +type State int + +// BackendStateInfo contains state information for a backend +type BackendStateInfo struct { + State string `json:"state"` + Error string `json:"error,omitempty"` + RestartCount int64 `json:"restart_count,omitempty"` + LastError string `json:"last_error,omitempty"` + LastRestartTS time.Time `json:"last_restart_ts,omitempty"` + LastRestartReason string `json:"last_restart_reason,omitempty"` +} + +// PolicyStateInfo contains state information for a policy +type PolicyStateInfo struct { + Name string `json:"name"` + Datasets []string `json:"datasets,omitempty"` + State string `json:"state"` + Error string `json:"error,omitempty"` + Version int32 `json:"version"` + LastScrapeBytes int64 `json:"last_scrape_bytes,omitempty"` + LastScrapeTS time.Time `json:"last_scrape_ts,omitempty"` + Backend string `json:"backend,omitempty"` +} + +// GroupStateInfo contains state information for a group +type GroupStateInfo struct { + GroupName string `json:"name"` + GroupChannel string `json:"channel"` +} + +// Heartbeat represents an agent heartbeat message type Heartbeat struct { - AgentID string `json:"agent_id"` - Version string `json:"version"` + SchemaVersion string `json:"schema_version"` + TimeStamp time.Time `json:"ts"` + State State `json:"state"` + BackendState map[string]BackendStateInfo `json:"backend_state"` + PolicyState map[string]PolicyStateInfo `json:"policy_state"` + GroupState map[string]GroupStateInfo `json:"group_state"` } // HeartbeatState represents the current state of an agent in the system @@ -31,8 +72,8 @@ type OrbAgentInfo struct { // BackendInfo contains version and configuration data for a specific backend type BackendInfo struct { - Version string `json:"version"` - Data map[string]interface{} `json:"data"` + Version string `json:"version"` + Data map[string]any `json:"data"` } // CurrentCapabilitiesSchemaVersion defines the current version of the capabilities schema From 65429a9ef63353f57a4dff8a7d35b7b65e4fa84e Mon Sep 17 00:00:00 2001 From: Leonardo Parente <23251360+leoparente@users.noreply.github.com> Date: Wed, 10 Sep 2025 09:02:09 -0300 Subject: [PATCH 4/8] add group --- agent/configmgr/fleet.go | 3 ++- agent/configmgr/fleet_test.go | 2 +- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/agent/configmgr/fleet.go b/agent/configmgr/fleet.go index bda9711..a2ba6d7 100644 --- a/agent/configmgr/fleet.go +++ b/agent/configmgr/fleet.go @@ -397,7 +397,8 @@ func (fleetManager *fleetConfigManager) getToken(ctx context.Context, tokenURL s fleetManager.logger.Debug("requesting access token", "token_url", tokenURL, "client_id", clientID) scopes := []string{ - "orb.mqtt", + "orb.mqtt:agent", + "orb.mqtt:group", } data := url.Values{} diff --git a/agent/configmgr/fleet_test.go b/agent/configmgr/fleet_test.go index 75076a8..538a088 100644 --- a/agent/configmgr/fleet_test.go +++ b/agent/configmgr/fleet_test.go @@ -431,7 +431,7 @@ func TestFleetConfigManager_GetToken_Success(t *testing.T) { err := r.ParseForm() assert.NoError(t, err) assert.Equal(t, "client_credentials", r.Form.Get("grant_type")) - assert.Contains(t, r.Form.Get("scope"), "orb.mqtt") + assert.Contains(t, r.Form.Get("scope"), "orb.mqtt:agent") // Return valid token response (no longer includes topics) response := tokenResponse{ From 480d2ca206845947df74e2b9823c14a0132b270e Mon Sep 17 00:00:00 2001 From: James Jeffries Date: Wed, 24 Sep 2025 14:54:04 +0100 Subject: [PATCH 5/8] connect over mqtt correctly --- agent/configmgr/fleet.go | 58 +++++++++------- agent/configmgr/fleet_test.go | 24 +++++-- agent/configmgr/jwt_claims.go | 17 ++--- agent/configmgr/jwt_claims_test.go | 80 ++++++++++++---------- agent/configmgr/messages/fleet_messages.go | 3 +- 5 files changed, 107 insertions(+), 75 deletions(-) diff --git a/agent/configmgr/fleet.go b/agent/configmgr/fleet.go index a2ba6d7..e52b25a 100644 --- a/agent/configmgr/fleet.go +++ b/agent/configmgr/fleet.go @@ -45,7 +45,7 @@ type heartbeater struct { heartbeatCtx context.Context } -func (hb *heartbeater) sendSingleHeartbeat(ctx context.Context, publishFunc func(ctx context.Context, payload []byte) error, agentID string, _ time.Time, _ messages.HeartbeatState) { +func (hb *heartbeater) sendSingleHeartbeat(ctx context.Context, publishFunc func(ctx context.Context, payload []byte) error, _ string, _ time.Time, _ messages.HeartbeatState) { hbData := messages.Heartbeat{ SchemaVersion: messages.CurrentHeartbeatSchemaVersion, TimeStamp: time.Now().UTC(), @@ -74,7 +74,7 @@ func (hb *heartbeater) sendHeartbeats(ctx context.Context, _ context.CancelFunc, // (if any) remain accurate. hb.heartbeatCtx = ctx - hb.logger.Debug("start heartbeats routine", slog.Any("routine", ctx.Value("routine"))) + hb.logger.Debug("start heartbeats routine") hb.sendSingleHeartbeat(ctx, publishFunc, agentID, time.Now(), messages.Online) for { @@ -106,8 +106,7 @@ func newFleetConfigManager(ctx context.Context, logger *slog.Logger, pMgr policy } func (fleetManager *fleetConfigManager) Start(cfg config.Config, backends map[string]backend.Backend) error { - ctx, _ := context.WithCancel(context.Background()) - // defer cancel() TODO: I dont think this is correct but we're cancelling the context too early so can't publish. Maybe they need to run under a different context? + ctx := context.Background() fleetManager.logger.Info("starting fleet config manager", "token_url", cfg.OrbAgent.ConfigManager.Sources.Fleet.TokenURL, "client_id", cfg.OrbAgent.ConfigManager.Sources.Fleet.ClientID, "client_secret", cfg.OrbAgent.ConfigManager.Sources.Fleet.ClientSecret) // call the token url to get the token @@ -116,11 +115,15 @@ func (fleetManager *fleetConfigManager) Start(cfg config.Config, backends map[st return err } + fleetManager.logger.Debug("JWT token", "token", token.AccessToken) + jwtClaims, err := parseJWTClaims(token.AccessToken) if err != nil { return fmt.Errorf("failed to parse JWT claims: %w", err) } + fleetManager.logger.Debug("JWT claims", "jwtClaims", jwtClaims) + // generate topics from JWT claims and config agent_id using hardcoded templates topics, err := generateTopicsFromTemplate(token.AccessToken, jwtClaims) if err != nil { @@ -174,24 +177,31 @@ func parseJWTClaims(tokenString string) (*JWTClaims, error) { // Extract org_id from custom claims jwtClaims := &JWTClaims{} - if agentID, ok := customClaims["orb:agent_id"].(string); ok { - jwtClaims.AgentID = agentID - } else { - return nil, fmt.Errorf("orb:agent_id claim not found or not a string in JWT token") - } if orgID, ok := customClaims["orb:org_id"].(string); ok { jwtClaims.OrgID = orgID } else { return nil, fmt.Errorf("orb:org_id claim not found or not a string in JWT token") } - if zone, ok := customClaims["orb:zone"].(string); ok { jwtClaims.Zone = zone } else { return nil, fmt.Errorf("orb:zone claim not found or not a string in JWT token") } - if ext, ok := customClaims["ext"].(map[string]any); ok { - if mqttURL, ok := ext["orb:mqtt_url"].(string); ok { + if clientID, ok := customClaims["client_id"].(string); ok { + jwtClaims.ClientID = clientID + } else { + return nil, fmt.Errorf("orb:zone claim not found or not a string in JWT token") + } + + if extClaims, ok := customClaims["ext"].(map[string]any); ok { + + if agentID, ok := extClaims["orb:agent_id"].(string); ok { + jwtClaims.AgentID = agentID + } else { + return nil, fmt.Errorf("orb:agent_id claim not found or not a string in JWT token") + } + + if mqttURL, ok := extClaims["orb:mqtt_url"].(string); ok { jwtClaims.MqttURL = mqttURL } } @@ -218,20 +228,20 @@ func (fleetManager *fleetConfigManager) connect(ctx context.Context, fleetMQTTUR OnConnectionUp: func(cm *autopaho.ConnectionManager, _ *paho.Connack) { fleetManager.logger.Info("MQTT connection established", "server", serverURL.String()) - //Subscribe to "mytopic" when connection is established - _, err := cm.Subscribe(context.Background(), &paho.Subscribe{ - Subscriptions: []paho.SubscribeOptions{ - {Topic: topics.Inbox, QoS: 1}, - }, - }) - if err != nil { - fleetManager.logger.Error("failed to subscribe", "topic", topics.Inbox, "error", err) - } else { - fleetManager.logger.Info("successfully subscribed", "topic", topics.Inbox) - } + // //Subscribe to "mytopic" when connection is established + // _, err := cm.Subscribe(context.Background(), &paho.Subscribe{ + // Subscriptions: []paho.SubscribeOptions{ + // {Topic: topics.Inbox, QoS: 1}, + // }, + // }) + // if err != nil { + // fleetManager.logger.Error("failed to subscribe", "topic", topics.Inbox, "error", err) + // } else { + // fleetManager.logger.Info("successfully subscribed", "topic", topics.Inbox) + // } // start heartbeat loop bound to the same connection-level context - fleetManager.heartbeater.sendHeartbeats(ctx, func() {}, func(ctx context.Context, payload []byte) error { + go fleetManager.heartbeater.sendHeartbeats(ctx, func() {}, func(ctx context.Context, payload []byte) error { publishResponse, err := cm.Publish(ctx, &paho.Publish{ Topic: topics.Heartbeat, Payload: payload, diff --git a/agent/configmgr/fleet_test.go b/agent/configmgr/fleet_test.go index 538a088..da404f8 100644 --- a/agent/configmgr/fleet_test.go +++ b/agent/configmgr/fleet_test.go @@ -755,6 +755,8 @@ func TestFleetConfigManager_Integration_SuccessFlow(t *testing.T) { response := tokenResponse{ AccessToken: rawJWTWithClaims(map[string]any{ "orb:org_id": "integration-org", + "orb:zone": "default", + "client_id": "test-client-123", "iat": 1516239022, }), MQTTURL: "mqtt://localhost:1883", // Valid but non-existent @@ -772,16 +774,18 @@ func TestFleetConfigManager_Integration_SuccessFlow(t *testing.T) { require.NoError(t, err) expectedJWT := rawJWTWithClaims(map[string]any{ "orb:org_id": "integration-org", + "orb:zone": "default", + "client_id": "test-client-123", "iat": 1516239022, }) assert.Equal(t, expectedJWT, token.AccessToken) assert.Equal(t, "mqtt://localhost:1883", token.MQTTURL) // Test that topic generation works with the JWT - topics, err := generateTopicsFromTemplate(token.AccessToken, &JWTClaims{AgentID: "test-agent-123", OrgID: "integration-org"}) + topics, err := generateTopicsFromTemplate(token.AccessToken, &JWTClaims{AgentID: "test-agent-123", OrgID: "integration-org", ClientID: "test-client-123"}) require.NoError(t, err) - assert.Equal(t, "orgs/integration-org/agents/test-agent-123/inbox", topics.Inbox) - assert.Equal(t, "orgs/integration-org/agents/test-agent-123/outbox", topics.Outbox) + assert.Equal(t, "orgs/integration-org/agents/test-client-123/inbox", topics.Inbox) + assert.Equal(t, "orgs/integration-org/agents/test-client-123/outbox", topics.Outbox) // Note: We don't test the full Start() method here because it would require // a real MQTT broker, but we've verified the token retrieval and topic generation works @@ -1248,6 +1252,7 @@ func TestFleetConfigManager_Start_WithJWTTopicGeneration(t *testing.T) { validJWT := rawJWTWithClaims(map[string]any{ "orb:org_id": "integration-org", "orb:zone": "default", + "client_id": "test-client", "iat": 1516239022, }) @@ -1322,6 +1327,7 @@ func TestGenerateTopicsFromTemplate_Integration(t *testing.T) { "orb:org_id": "acme-corp", "orb:zone": "z1", "iat": 1516239022, + "client_id": "agent-prod-456", }), expectedOrg: "acme-corp", expectedAgent: "agent-prod-456", @@ -1332,15 +1338,23 @@ func TestGenerateTopicsFromTemplate_Integration(t *testing.T) { "orb:org_id": "dev-123", "orb:zone": "z2", "iat": 1516239022, + "client_id": "agent-dev-789", }), expectedOrg: "dev-123", - expectedAgent: "dev-agent-789", + expectedAgent: "agent-dev-789", }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - topics, err := generateTopicsFromTemplate(tt.jwt, &JWTClaims{AgentID: tt.expectedAgent, OrgID: tt.expectedOrg}) + // Extract the client_id that would be in the JWT for proper testing + var clientID string + if tt.name == "production-like JWT" { + clientID = "agent-prod-456" + } else { + clientID = "agent-dev-789" + } + topics, err := generateTopicsFromTemplate(tt.jwt, &JWTClaims{OrgID: tt.expectedOrg, ClientID: clientID}) require.NoError(t, err) expectedTopics := &tokenResponseTopics{ diff --git a/agent/configmgr/jwt_claims.go b/agent/configmgr/jwt_claims.go index 25f81e7..5a1d053 100644 --- a/agent/configmgr/jwt_claims.go +++ b/agent/configmgr/jwt_claims.go @@ -6,16 +6,17 @@ import ( // JWTClaims represents the JWT claims we extract for topic templating type JWTClaims struct { - AgentID string `json:"agent_id"` - OrgID string `json:"orb:org_id"` - Zone string `json:"orb:zone"` - MqttURL string `json:"orb:mqtt_url"` + AgentID string `json:"agent_id"` + OrgID string `json:"orb:org_id"` + Zone string `json:"orb:zone"` + ClientID string `json:"client_id"` + MqttURL string `json:"orb:mqtt_url"` } // TopicClaims combines org_id from JWT with agent_id from config type TopicClaims struct { OrgID string - ClientId string + ClientID string } // TopicTemplates defines hardcoded topic name patterns with placeholders @@ -40,16 +41,16 @@ func DefaultTopicTemplates() TopicTemplates { func fillTopicTemplate(template string, claims *TopicClaims) string { result := template result = strings.ReplaceAll(result, "{org_id}", claims.OrgID) - result = strings.ReplaceAll(result, "{client_id}", claims.ClientId) + result = strings.ReplaceAll(result, "{client_id}", claims.ClientID) return result } // generateTopicsFromTemplate creates actual topic names from templates using JWT claims and config agent_id -func generateTopicsFromTemplate(tokenString string, jwtClaims *JWTClaims) (*tokenResponseTopics, error) { +func generateTopicsFromTemplate(_ string, jwtClaims *JWTClaims) (*tokenResponseTopics, error) { // Combine JWT org_id with config agent_id topicClaims := &TopicClaims{ OrgID: jwtClaims.OrgID, - ClientId: jwtClaims.AgentID, + ClientID: jwtClaims.ClientID, } templates := DefaultTopicTemplates() diff --git a/agent/configmgr/jwt_claims_test.go b/agent/configmgr/jwt_claims_test.go index fc24721..e9d62ef 100644 --- a/agent/configmgr/jwt_claims_test.go +++ b/agent/configmgr/jwt_claims_test.go @@ -10,10 +10,10 @@ import ( func TestDefaultTopicTemplates(t *testing.T) { templates := DefaultTopicTemplates() - assert.Equal(t, "/orgs/{org_id}/agents/{agent_id}/heartbeats", templates.Heartbeat) - assert.Equal(t, "/orgs/{org_id}/agents/{agent_id}/capabilities", templates.Capabilities) - assert.Equal(t, "/orgs/{org_id}/agents/{agent_id}/inbox", templates.Inbox) - assert.Equal(t, "/orgs/{org_id}/agents/{agent_id}/outbox", templates.Outbox) + assert.Equal(t, "orgs/{org_id}/agents/{client_id}/heartbeats", templates.Heartbeat) + assert.Equal(t, "orgs/{org_id}/agents/{client_id}/capabilities", templates.Capabilities) + assert.Equal(t, "orgs/{org_id}/agents/{client_id}/inbox", templates.Inbox) + assert.Equal(t, "orgs/{org_id}/agents/{client_id}/outbox", templates.Outbox) } func TestFillTopicTemplate(t *testing.T) { @@ -25,19 +25,19 @@ func TestFillTopicTemplate(t *testing.T) { }{ { name: "basic substitution", - template: "/orgs/{org_id}/agents/{agent_id}/test", + template: "/orgs/{org_id}/agents/{client_id}/test", claims: &TopicClaims{ OrgID: "org123", - ClientId: "agent-456", + ClientID: "agent-456", }, expected: "/orgs/org123/agents/agent-456/test", }, { name: "multiple occurrences", - template: "{org_id}/data/{org_id}/{agent_id}", + template: "{org_id}/data/{org_id}/{client_id}", claims: &TopicClaims{ OrgID: "company1", - ClientId: "agent-789", + ClientID: "agent-789", }, expected: "company1/data/company1/agent-789", }, @@ -46,16 +46,16 @@ func TestFillTopicTemplate(t *testing.T) { template: "static/topic/name", claims: &TopicClaims{ OrgID: "org123", - ClientId: "agent-456", + ClientID: "agent-456", }, expected: "static/topic/name", }, { name: "empty claims", - template: "/orgs/{org_id}/agents/{agent_id}/test", + template: "/orgs/{org_id}/agents/{client_id}/test", claims: &TopicClaims{ OrgID: "", - ClientId: "", + ClientID: "", }, expected: "/orgs//agents//test", }, @@ -89,30 +89,34 @@ func TestParseJWTClaims(t *testing.T) { { name: "valid JWT with org_id", tokenString: rawJWTWithClaims(map[string]any{ - "org_id": "test-org", - "agent_id": "test-agent-123", - "iat": 1516239022, + "orb:org_id": "test-org", + "orb:zone": "default", + "client_id": "test-client", + "iat": 1516239022, }), expected: &JWTClaims{ - OrgID: "test-org", + OrgID: "test-org", + Zone: "default", + ClientID: "test-client", }, }, { name: "JWT missing org_id", tokenString: rawJWTWithClaims(map[string]any{ - "agent_id": "test-agent-123", - "iat": 1516239022, + "client_id": "test-client", + "iat": 1516239022, }), - expectedErr: "org_id claim not found", + expectedErr: "orb:org_id claim not found", }, { name: "JWT with non-string org_id", tokenString: rawJWTWithClaims(map[string]any{ - "org_id": 123, - "agent_id": "test-agent-123", - "iat": 1516239022, + "orb:org_id": 123, + "orb:zone": "default", + "client_id": "test-client", + "iat": 1516239022, }), - expectedErr: "org_id claim not found or not a string", + expectedErr: "orb:org_id claim not found or not a string", }, } @@ -142,38 +146,40 @@ func TestGenerateTopicsFromTemplate(t *testing.T) { { name: "valid token generates correct topics", tokenString: rawJWTWithClaims(map[string]any{ - "org_id": "test-org", - "agent_id": "test-agent-123", - "iat": 1516239022, + "orb:org_id": "test-org", + "orb:zone": "default", + "client_id": "test-client-123", + "iat": 1516239022, }), orgID: "test-org", expected: &tokenResponseTopics{ - Heartbeat: "/orgs/test-org/agents/test-agent-123/heartbeat", - Capabilities: "/orgs/test-org/agents/test-agent-123/capabilities", - Inbox: "/orgs/test-org/agents/test-agent-123/inbox", - Outbox: "/orgs/test-org/agents/test-agent-123/outbox", + Heartbeat: "orgs/test-org/agents/test-client-123/heartbeats", + Capabilities: "orgs/test-org/agents/test-client-123/capabilities", + Inbox: "orgs/test-org/agents/test-client-123/inbox", + Outbox: "orgs/test-org/agents/test-client-123/outbox", }, }, { name: "different org and agent values", tokenString: rawJWTWithClaims(map[string]any{ - "org_id": "prod-company", - "agent_id": "agent-456", - "iat": 1516239022, + "orb:org_id": "prod-company", + "orb:zone": "default", + "client_id": "prod-client-456", + "iat": 1516239022, }), orgID: "prod-company", expected: &tokenResponseTopics{ - Heartbeat: "/orgs/prod-company/agents/test-agent-123/heartbeat", - Capabilities: "/orgs/prod-company/agents/test-agent-123/capabilities", - Inbox: "/orgs/prod-company/agents/test-agent-123/inbox", - Outbox: "/orgs/prod-company/agents/test-agent-123/outbox", + Heartbeat: "orgs/prod-company/agents/test-client-123/heartbeats", + Capabilities: "orgs/prod-company/agents/test-client-123/capabilities", + Inbox: "orgs/prod-company/agents/test-client-123/inbox", + Outbox: "orgs/prod-company/agents/test-client-123/outbox", }, }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - topics, err := generateTopicsFromTemplate(tt.tokenString, &JWTClaims{AgentID: "test-agent-123", OrgID: tt.orgID}) + topics, err := generateTopicsFromTemplate(tt.tokenString, &JWTClaims{OrgID: tt.orgID, ClientID: "test-client-123"}) require.NoError(t, err) assert.Equal(t, tt.expected, topics) diff --git a/agent/configmgr/messages/fleet_messages.go b/agent/configmgr/messages/fleet_messages.go index b871273..62be59f 100644 --- a/agent/configmgr/messages/fleet_messages.go +++ b/agent/configmgr/messages/fleet_messages.go @@ -2,10 +2,11 @@ package messages import "time" -// Heartbeat represents a periodic message sent by an agent to indicate it's alive and active // CurrentHeartbeatSchemaVersion defines the current version of the heartbeat schema const CurrentHeartbeatSchemaVersion = "1.0" +// Heartbeat represents a periodic message sent by an agent to indicate it's alive and active + // State represents the current state of an agent in the system type State int From 4120360aa4f3defccf105cc0d19f75c13832b4b4 Mon Sep 17 00:00:00 2001 From: James Jeffries Date: Wed, 24 Sep 2025 16:37:46 +0100 Subject: [PATCH 6/8] removes debug logging --- agent/configmgr/fleet.go | 4 ---- 1 file changed, 4 deletions(-) diff --git a/agent/configmgr/fleet.go b/agent/configmgr/fleet.go index e52b25a..e2c5bbe 100644 --- a/agent/configmgr/fleet.go +++ b/agent/configmgr/fleet.go @@ -115,15 +115,11 @@ func (fleetManager *fleetConfigManager) Start(cfg config.Config, backends map[st return err } - fleetManager.logger.Debug("JWT token", "token", token.AccessToken) - jwtClaims, err := parseJWTClaims(token.AccessToken) if err != nil { return fmt.Errorf("failed to parse JWT claims: %w", err) } - fleetManager.logger.Debug("JWT claims", "jwtClaims", jwtClaims) - // generate topics from JWT claims and config agent_id using hardcoded templates topics, err := generateTopicsFromTemplate(token.AccessToken, jwtClaims) if err != nil { From ed18bc961cab28e9135bef944020923a5487c0cc Mon Sep 17 00:00:00 2001 From: James Jeffries Date: Thu, 25 Sep 2025 11:35:04 +0100 Subject: [PATCH 7/8] corrects topics to use agent id --- agent/configmgr/fleet.go | 60 +-------- agent/configmgr/fleet_test.go | 188 +++-------------------------- agent/configmgr/jwt_claims.go | 109 +++++++++++------ agent/configmgr/jwt_claims_test.go | 151 +++++++++++------------ 4 files changed, 156 insertions(+), 352 deletions(-) diff --git a/agent/configmgr/fleet.go b/agent/configmgr/fleet.go index e2c5bbe..7e24860 100644 --- a/agent/configmgr/fleet.go +++ b/agent/configmgr/fleet.go @@ -15,8 +15,6 @@ import ( "github.com/eclipse/paho.golang/autopaho" "github.com/eclipse/paho.golang/paho" - "github.com/go-jose/go-jose/v4" - "github.com/go-jose/go-jose/v4/jwt" "github.com/netboxlabs/orb-agent/agent/backend" "github.com/netboxlabs/orb-agent/agent/config" @@ -121,7 +119,7 @@ func (fleetManager *fleetConfigManager) Start(cfg config.Config, backends map[st } // generate topics from JWT claims and config agent_id using hardcoded templates - topics, err := generateTopicsFromTemplate(token.AccessToken, jwtClaims) + topics, err := generateTopicsFromTemplate(jwtClaims) if err != nil { return fmt.Errorf("failed to generate topics: %w", err) } @@ -149,62 +147,6 @@ func (fleetManager *fleetConfigManager) Start(cfg config.Config, backends map[st return nil } -// parseJWTClaims extracts org_id claim from a JWT token -func parseJWTClaims(tokenString string) (*JWTClaims, error) { - if tokenString == "" { - return nil, fmt.Errorf("empty token string") - } - - // Parse the JWT token without verification (since we already trust it from the token endpoint) - // We accept common signature algorithms used in JWTs - token, err := jwt.ParseSigned(tokenString, []jose.SignatureAlgorithm{jose.HS256, jose.HS384, jose.HS512, jose.RS256, jose.RS384, jose.RS512, jose.ES256, jose.ES384, jose.ES512}) - if err != nil { - return nil, fmt.Errorf("failed to parse JWT token: %w", err) - } - - var claims jwt.Claims - var customClaims map[string]any - - // Extract both standard and custom claims without verification - if err := token.UnsafeClaimsWithoutVerification(&claims, &customClaims); err != nil { - return nil, fmt.Errorf("failed to extract claims from JWT: %w", err) - } - - // Extract org_id from custom claims - jwtClaims := &JWTClaims{} - - if orgID, ok := customClaims["orb:org_id"].(string); ok { - jwtClaims.OrgID = orgID - } else { - return nil, fmt.Errorf("orb:org_id claim not found or not a string in JWT token") - } - if zone, ok := customClaims["orb:zone"].(string); ok { - jwtClaims.Zone = zone - } else { - return nil, fmt.Errorf("orb:zone claim not found or not a string in JWT token") - } - if clientID, ok := customClaims["client_id"].(string); ok { - jwtClaims.ClientID = clientID - } else { - return nil, fmt.Errorf("orb:zone claim not found or not a string in JWT token") - } - - if extClaims, ok := customClaims["ext"].(map[string]any); ok { - - if agentID, ok := extClaims["orb:agent_id"].(string); ok { - jwtClaims.AgentID = agentID - } else { - return nil, fmt.Errorf("orb:agent_id claim not found or not a string in JWT token") - } - - if mqttURL, ok := extClaims["orb:mqtt_url"].(string); ok { - jwtClaims.MqttURL = mqttURL - } - } - - return jwtClaims, nil -} - func (fleetManager *fleetConfigManager) connect(ctx context.Context, fleetMQTTURL, token string, topics tokenResponseTopics, backends map[string]backend.Backend, clientID, zone string) error { // Parse the ORB URL serverURL, err := url.Parse(fleetMQTTURL) diff --git a/agent/configmgr/fleet_test.go b/agent/configmgr/fleet_test.go index da404f8..e85d290 100644 --- a/agent/configmgr/fleet_test.go +++ b/agent/configmgr/fleet_test.go @@ -5,7 +5,6 @@ import ( "encoding/base64" "encoding/json" "errors" - "fmt" "log/slog" "net/http" "net/http/httptest" @@ -567,7 +566,10 @@ func TestFleetConfigManager_Connect_ValidURL(t *testing.T) { // since we don't have a real MQTT server backends := make(map[string]backend.Backend) trt2 := tokenResponseTopics{Inbox: "test/topic"} - err := fleetManager.connect(context.Background(), "mqtt://localhost:1883", "test_token", trt2, backends, "test-agent-id", "test-zone") + // Timeout after 3 seconds + ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) + defer cancel() + err := fleetManager.connect(ctx, "mqtt://localhost:1883", "test_token", trt2, backends, "test-agent-id", "test-zone") // Assert - we expect connection to fail since no server is running, // but URL parsing should succeed @@ -720,96 +722,6 @@ func TestTokenResponse_Marshaling(t *testing.T) { assert.Equal(t, original.ExpiresIn, unmarshaled.ExpiresIn) } -func TestTokenResponseTopics_Marshaling(t *testing.T) { - // Arrange - original := tokenResponseTopics{ - Inbox: "custom/inbox/topic", - Outbox: "custom/outbox/topic", - } - - // Act - jsonData, err := json.Marshal(original) - require.NoError(t, err) - - var unmarshaled tokenResponseTopics - err = json.Unmarshal(jsonData, &unmarshaled) - require.NoError(t, err) - - // Assert - assert.Equal(t, original.Inbox, unmarshaled.Inbox) - assert.Equal(t, original.Outbox, unmarshaled.Outbox) -} - -func TestFleetConfigManager_Integration_SuccessFlow(t *testing.T) { - // This test verifies the happy path integration but without actual MQTT connection - // due to complexity of mocking MQTT server - - // Arrange - logger := slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelError})) - mockPMgr := &mockPolicyManagerForFleet{} - fleetManager := newFleetConfigManager(context.Background(), logger, mockPMgr) - defer fleetManager.heartbeater.hbTicker.Stop() - - // Create mock HTTP server for successful token response - server := httptest.NewTLSServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) { - response := tokenResponse{ - AccessToken: rawJWTWithClaims(map[string]any{ - "orb:org_id": "integration-org", - "orb:zone": "default", - "client_id": "test-client-123", - "iat": 1516239022, - }), - MQTTURL: "mqtt://localhost:1883", // Valid but non-existent - ExpiresIn: 3600, - } - _ = json.NewEncoder(w).Encode(response) - })) - defer server.Close() - - // Act - test token retrieval part of the flow - ctx := context.Background() - token, err := fleetManager.getToken(ctx, server.URL, "integration_client", "integration_secret") - - // Assert - token retrieval should succeed - require.NoError(t, err) - expectedJWT := rawJWTWithClaims(map[string]any{ - "orb:org_id": "integration-org", - "orb:zone": "default", - "client_id": "test-client-123", - "iat": 1516239022, - }) - assert.Equal(t, expectedJWT, token.AccessToken) - assert.Equal(t, "mqtt://localhost:1883", token.MQTTURL) - - // Test that topic generation works with the JWT - topics, err := generateTopicsFromTemplate(token.AccessToken, &JWTClaims{AgentID: "test-agent-123", OrgID: "integration-org", ClientID: "test-client-123"}) - require.NoError(t, err) - assert.Equal(t, "orgs/integration-org/agents/test-client-123/inbox", topics.Inbox) - assert.Equal(t, "orgs/integration-org/agents/test-client-123/outbox", topics.Outbox) - - // Note: We don't test the full Start() method here because it would require - // a real MQTT broker, but we've verified the token retrieval and topic generation works -} - -func TestFleetConfigManager_CompilerInterfaceCheck(t *testing.T) { - // This test ensures that fleetConfigManager implements the Manager interface - // The actual check is done at compile time with: var _ Manager = (*fleetConfigManager)(nil) - - logger := slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelError})) - mockPMgr := &mockPolicyManagerForFleet{} - fleetManager := newFleetConfigManager(context.Background(), logger, mockPMgr) - defer fleetManager.heartbeater.hbTicker.Stop() - - // Verify that fleetManager can be assigned to Manager interface - var manager Manager = fleetManager - assert.NotNil(t, manager) - - // Verify that all Manager interface methods are available - ctx := context.Background() - resultCtx := manager.GetContext(ctx) - assert.NotNil(t, resultCtx) -} - // Test edge cases for heartbeater ticker cleanup func TestFleetConfigManager_HeartbeaterTickerCleanup(t *testing.T) { // Arrange @@ -829,27 +741,6 @@ func TestFleetConfigManager_HeartbeaterTickerCleanup(t *testing.T) { assert.True(t, true, "Ticker cleanup should not cause issues") } -// Test multiple fleetConfigManager instances -func TestFleetConfigManager_MultipleInstances(t *testing.T) { - // Arrange - logger := slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelError})) - mockPMgr1 := &mockPolicyManagerForFleet{} - mockPMgr2 := &mockPolicyManagerForFleet{} - - // Act - manager1 := newFleetConfigManager(context.Background(), logger, mockPMgr1) - manager2 := newFleetConfigManager(context.Background(), logger, mockPMgr2) - - // Assert - assert.NotEqual(t, manager1, manager2, "Different instances should be created") - assert.NotEqual(t, manager1.heartbeater, manager2.heartbeater, "Each should have separate heartbeater") - assert.NotEqual(t, manager1.heartbeater.hbTicker, manager2.heartbeater.hbTicker, "Each should have separate ticker") - - // Cleanup - manager1.heartbeater.hbTicker.Stop() - manager2.heartbeater.hbTicker.Stop() -} - // mockBackend implements the Backend interface for testing type mockBackend struct { mock.Mock @@ -1248,12 +1139,17 @@ func TestFleetConfigManager_SendCapabilities_CapabilitiesStructure(t *testing.T) func TestFleetConfigManager_Start_WithJWTTopicGeneration(t *testing.T) { // This test verifies that the Start method correctly generates topics from JWT claims + mqttURL := "mqtt://test.example.com:1883" // Create a valid JWT token with orb-prefixed claims used by parseJWTClaims validJWT := rawJWTWithClaims(map[string]any{ - "orb:org_id": "integration-org", - "orb:zone": "default", - "client_id": "test-client", - "iat": 1516239022, + "orb:org_id": "integration-org", + "orb:zone": "default", + "orb:agent_id": "test-agent", + "client_id": "test-client", + "iat": 1516239022, + "ext": map[string]any{ + "orb:mqtt_url": mqttURL, + }, }) logger := slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelDebug})) @@ -1265,7 +1161,7 @@ func TestFleetConfigManager_Start_WithJWTTopicGeneration(t *testing.T) { server := httptest.NewTLSServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) { response := tokenResponse{ AccessToken: validJWT, - MQTTURL: "mqtt://test.example.com:1883", + MQTTURL: mqttURL, ExpiresIn: 3600, } @@ -1312,59 +1208,3 @@ func TestFleetConfigManager_Start_WithJWTTopicGeneration(t *testing.T) { strings.Contains(errorMsg, "deadline"), "Expected connection-related error, got: %s", err.Error()) } - -func TestGenerateTopicsFromTemplate_Integration(t *testing.T) { - // Test the integration of JWT parsing with topic generation using real JWT - tests := []struct { - name string - jwt string - expectedOrg string - expectedAgent string - }{ - { - name: "production-like JWT", - jwt: rawJWTWithClaims(map[string]any{ - "orb:org_id": "acme-corp", - "orb:zone": "z1", - "iat": 1516239022, - "client_id": "agent-prod-456", - }), - expectedOrg: "acme-corp", - expectedAgent: "agent-prod-456", - }, - { - name: "development JWT with different values", - jwt: rawJWTWithClaims(map[string]any{ - "orb:org_id": "dev-123", - "orb:zone": "z2", - "iat": 1516239022, - "client_id": "agent-dev-789", - }), - expectedOrg: "dev-123", - expectedAgent: "agent-dev-789", - }, - } - - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - // Extract the client_id that would be in the JWT for proper testing - var clientID string - if tt.name == "production-like JWT" { - clientID = "agent-prod-456" - } else { - clientID = "agent-dev-789" - } - topics, err := generateTopicsFromTemplate(tt.jwt, &JWTClaims{OrgID: tt.expectedOrg, ClientID: clientID}) - require.NoError(t, err) - - expectedTopics := &tokenResponseTopics{ - Heartbeat: fmt.Sprintf("orgs/%s/agents/%s/heartbeats", tt.expectedOrg, tt.expectedAgent), - Capabilities: fmt.Sprintf("orgs/%s/agents/%s/capabilities", tt.expectedOrg, tt.expectedAgent), - Inbox: fmt.Sprintf("orgs/%s/agents/%s/inbox", tt.expectedOrg, tt.expectedAgent), - Outbox: fmt.Sprintf("orgs/%s/agents/%s/outbox", tt.expectedOrg, tt.expectedAgent), - } - - assert.Equal(t, expectedTopics, topics) - }) - } -} diff --git a/agent/configmgr/jwt_claims.go b/agent/configmgr/jwt_claims.go index 5a1d053..f3da645 100644 --- a/agent/configmgr/jwt_claims.go +++ b/agent/configmgr/jwt_claims.go @@ -1,65 +1,98 @@ package configmgr import ( + "fmt" "strings" + + "github.com/go-jose/go-jose/v4" + "github.com/go-jose/go-jose/v4/jwt" ) // JWTClaims represents the JWT claims we extract for topic templating type JWTClaims struct { - AgentID string `json:"agent_id"` - OrgID string `json:"orb:org_id"` - Zone string `json:"orb:zone"` - ClientID string `json:"client_id"` - MqttURL string `json:"orb:mqtt_url"` -} - -// TopicClaims combines org_id from JWT with agent_id from config -type TopicClaims struct { + AgentID string OrgID string + Zone string ClientID string + MqttURL string } -// TopicTemplates defines hardcoded topic name patterns with placeholders -type TopicTemplates struct { - Heartbeat string - Capabilities string - Inbox string - Outbox string -} +// parseJWTClaims extracts org_id claim from a JWT token +func parseJWTClaims(tokenString string) (*JWTClaims, error) { + if tokenString == "" { + return nil, fmt.Errorf("empty token string") + } + + // Parse the JWT token without verification (since we already trust it from the token endpoint) + // We accept common signature algorithms used in JWTs + token, err := jwt.ParseSigned(tokenString, []jose.SignatureAlgorithm{jose.HS256, jose.HS384, jose.HS512, jose.RS256, jose.RS384, jose.RS512, jose.ES256, jose.ES384, jose.ES512}) + if err != nil { + return nil, fmt.Errorf("failed to parse JWT token: %w", err) + } + + var claims jwt.Claims + var customClaims map[string]any + + // Extract both standard and custom claims without verification + if err := token.UnsafeClaimsWithoutVerification(&claims, &customClaims); err != nil { + return nil, fmt.Errorf("failed to extract claims from JWT: %w", err) + } + + // Extract org_id from custom claims + jwtClaims := &JWTClaims{} -// DefaultTopicTemplates returns the hardcoded topic templates -func DefaultTopicTemplates() TopicTemplates { - return TopicTemplates{ - Heartbeat: "orgs/{org_id}/agents/{client_id}/heartbeats", - Capabilities: "orgs/{org_id}/agents/{client_id}/capabilities", - Inbox: "orgs/{org_id}/agents/{client_id}/inbox", - Outbox: "orgs/{org_id}/agents/{client_id}/outbox", + if orgID, ok := customClaims["orb:org_id"].(string); ok { + jwtClaims.OrgID = orgID + } else { + return nil, fmt.Errorf("orb:org_id claim not found or not a string in JWT token") } + if zone, ok := customClaims["orb:zone"].(string); ok { + jwtClaims.Zone = zone + } else { + return nil, fmt.Errorf("orb:zone claim not found or not a string in JWT token") + } + if clientID, ok := customClaims["client_id"].(string); ok { + jwtClaims.ClientID = clientID + } else { + return nil, fmt.Errorf("orb:zone claim not found or not a string in JWT token") + } + if agentID, ok := customClaims["orb:agent_id"].(string); ok { + jwtClaims.AgentID = agentID + } else { + return nil, fmt.Errorf("orb:agent_id claim not found or not a string in JWT token") + } + + if extClaims, ok := customClaims["ext"].(map[string]any); ok { + if mqttURL, ok := extClaims["orb:mqtt_url"].(string); ok { + jwtClaims.MqttURL = mqttURL + } + } + + return jwtClaims, nil } // fillTopicTemplate replaces placeholders in a topic template with actual values -func fillTopicTemplate(template string, claims *TopicClaims) string { +func fillTopicTemplate(template string, claims *JWTClaims) string { result := template result = strings.ReplaceAll(result, "{org_id}", claims.OrgID) - result = strings.ReplaceAll(result, "{client_id}", claims.ClientID) + result = strings.ReplaceAll(result, "{agent_id}", claims.AgentID) return result } -// generateTopicsFromTemplate creates actual topic names from templates using JWT claims and config agent_id -func generateTopicsFromTemplate(_ string, jwtClaims *JWTClaims) (*tokenResponseTopics, error) { - // Combine JWT org_id with config agent_id - topicClaims := &TopicClaims{ - OrgID: jwtClaims.OrgID, - ClientID: jwtClaims.ClientID, - } - - templates := DefaultTopicTemplates() +const ( + heartbeatTemplate = "orgs/{org_id}/agents/{agent_id}/heartbeats" + capabilitiesTemplate = "orgs/{org_id}/agents/{agent_id}/capabilities" + inboxTemplate = "orgs/{org_id}/agents/{agent_id}/inbox" + outboxTemplate = "orgs/{org_id}/agents/{agent_id}/outbox" +) +// generateTopicsFromTemplate creates actual topic names from templates using JWT claims and config agent_id +func generateTopicsFromTemplate(jwtClaims *JWTClaims) (*tokenResponseTopics, error) { topics := &tokenResponseTopics{ - Heartbeat: fillTopicTemplate(templates.Heartbeat, topicClaims), - Capabilities: fillTopicTemplate(templates.Capabilities, topicClaims), - Inbox: fillTopicTemplate(templates.Inbox, topicClaims), - Outbox: fillTopicTemplate(templates.Outbox, topicClaims), + Heartbeat: fillTopicTemplate(heartbeatTemplate, jwtClaims), + Capabilities: fillTopicTemplate(capabilitiesTemplate, jwtClaims), + Inbox: fillTopicTemplate(inboxTemplate, jwtClaims), + Outbox: fillTopicTemplate(outboxTemplate, jwtClaims), } return topics, nil diff --git a/agent/configmgr/jwt_claims_test.go b/agent/configmgr/jwt_claims_test.go index e9d62ef..e3d4ddb 100644 --- a/agent/configmgr/jwt_claims_test.go +++ b/agent/configmgr/jwt_claims_test.go @@ -7,55 +7,46 @@ import ( "github.com/stretchr/testify/require" ) -func TestDefaultTopicTemplates(t *testing.T) { - templates := DefaultTopicTemplates() - - assert.Equal(t, "orgs/{org_id}/agents/{client_id}/heartbeats", templates.Heartbeat) - assert.Equal(t, "orgs/{org_id}/agents/{client_id}/capabilities", templates.Capabilities) - assert.Equal(t, "orgs/{org_id}/agents/{client_id}/inbox", templates.Inbox) - assert.Equal(t, "orgs/{org_id}/agents/{client_id}/outbox", templates.Outbox) -} - func TestFillTopicTemplate(t *testing.T) { tests := []struct { name string template string - claims *TopicClaims + claims *JWTClaims expected string }{ { name: "basic substitution", - template: "/orgs/{org_id}/agents/{client_id}/test", - claims: &TopicClaims{ - OrgID: "org123", - ClientID: "agent-456", + template: "/orgs/{org_id}/agents/{agent_id}/test", + claims: &JWTClaims{ + OrgID: "org123", + AgentID: "agent-456", }, expected: "/orgs/org123/agents/agent-456/test", }, { name: "multiple occurrences", - template: "{org_id}/data/{org_id}/{client_id}", - claims: &TopicClaims{ - OrgID: "company1", - ClientID: "agent-789", + template: "{org_id}/data/{org_id}/{agent_id}", + claims: &JWTClaims{ + OrgID: "company1", + AgentID: "agent-789", }, expected: "company1/data/company1/agent-789", }, { name: "no placeholders", template: "static/topic/name", - claims: &TopicClaims{ - OrgID: "org123", - ClientID: "agent-456", + claims: &JWTClaims{ + OrgID: "org123", + AgentID: "agent-456", }, expected: "static/topic/name", }, { name: "empty claims", - template: "/orgs/{org_id}/agents/{client_id}/test", - claims: &TopicClaims{ - OrgID: "", - ClientID: "", + template: "/orgs/{org_id}/agents/{agent_id}/test", + claims: &JWTClaims{ + OrgID: "", + AgentID: "", }, expected: "/orgs//agents//test", }, @@ -89,34 +80,64 @@ func TestParseJWTClaims(t *testing.T) { { name: "valid JWT with org_id", tokenString: rawJWTWithClaims(map[string]any{ - "orb:org_id": "test-org", - "orb:zone": "default", - "client_id": "test-client", - "iat": 1516239022, + "orb:org_id": "test-org", + "orb:zone": "default", + "orb:agent_id": "test-agent", + "client_id": "test-client", + "iat": 1516239022, + "ext": map[string]any{ + "orb:mqtt_url": "mqtt://test.example.com:1883", + }, }), expected: &JWTClaims{ OrgID: "test-org", Zone: "default", ClientID: "test-client", + AgentID: "test-agent", + MqttURL: "mqtt://test.example.com:1883", }, }, { name: "JWT missing org_id", tokenString: rawJWTWithClaims(map[string]any{ - "client_id": "test-client", - "iat": 1516239022, + "client_id": "test-client", + "iat": 1516239022, + "orb:agent_id": "test-agent", + "orb:zone": "default", }), expectedErr: "orb:org_id claim not found", }, { name: "JWT with non-string org_id", tokenString: rawJWTWithClaims(map[string]any{ - "orb:org_id": 123, - "orb:zone": "default", + "orb:org_id": 123, + "orb:zone": "default", + "client_id": "test-client", + "iat": 1516239022, + "orb:agent_id": "test-agent", + }), + expectedErr: "orb:org_id claim not found or not a string", + }, + { + name: "JWT missing agent_id", + tokenString: rawJWTWithClaims(map[string]any{ "client_id": "test-client", "iat": 1516239022, + "orb:org_id": "test-org", + "orb:zone": "default", }), - expectedErr: "orb:org_id claim not found or not a string", + expectedErr: "orb:agent_id claim not found", + }, + { + name: "JWT with non-string agent_id", + tokenString: rawJWTWithClaims(map[string]any{ + "orb:agent_id": 123, + "orb:zone": "default", + "client_id": "test-client", + "iat": 1516239022, + "orb:org_id": "test-org", + }), + expectedErr: "orb:agent_id claim not found or not a string", }, } @@ -138,20 +159,15 @@ func TestParseJWTClaims(t *testing.T) { func TestGenerateTopicsFromTemplate(t *testing.T) { tests := []struct { - name string - tokenString string - orgID string - expected *tokenResponseTopics + name string + orgID string + agentID string + expected *tokenResponseTopics }{ { - name: "valid token generates correct topics", - tokenString: rawJWTWithClaims(map[string]any{ - "orb:org_id": "test-org", - "orb:zone": "default", - "client_id": "test-client-123", - "iat": 1516239022, - }), - orgID: "test-org", + name: "valid token generates correct topics", + orgID: "test-org", + agentID: "test-client-123", expected: &tokenResponseTopics{ Heartbeat: "orgs/test-org/agents/test-client-123/heartbeats", Capabilities: "orgs/test-org/agents/test-client-123/capabilities", @@ -160,51 +176,24 @@ func TestGenerateTopicsFromTemplate(t *testing.T) { }, }, { - name: "different org and agent values", - tokenString: rawJWTWithClaims(map[string]any{ - "orb:org_id": "prod-company", - "orb:zone": "default", - "client_id": "prod-client-456", - "iat": 1516239022, - }), - orgID: "prod-company", + name: "different org and agent values", + orgID: "prod-company", + agentID: "test-agent-123", expected: &tokenResponseTopics{ - Heartbeat: "orgs/prod-company/agents/test-client-123/heartbeats", - Capabilities: "orgs/prod-company/agents/test-client-123/capabilities", - Inbox: "orgs/prod-company/agents/test-client-123/inbox", - Outbox: "orgs/prod-company/agents/test-client-123/outbox", + Heartbeat: "orgs/prod-company/agents/test-agent-123/heartbeats", + Capabilities: "orgs/prod-company/agents/test-agent-123/capabilities", + Inbox: "orgs/prod-company/agents/test-agent-123/inbox", + Outbox: "orgs/prod-company/agents/test-agent-123/outbox", }, }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - topics, err := generateTopicsFromTemplate(tt.tokenString, &JWTClaims{OrgID: tt.orgID, ClientID: "test-client-123"}) + topics, err := generateTopicsFromTemplate(&JWTClaims{OrgID: tt.orgID, AgentID: tt.agentID}) require.NoError(t, err) assert.Equal(t, tt.expected, topics) }) } } - -func TestJWTClaimsStruct(t *testing.T) { - claims := JWTClaims{ - OrgID: "test-org-id", - } - - assert.Equal(t, "test-org-id", claims.OrgID) -} - -func TestTopicTemplatesStruct(t *testing.T) { - templates := TopicTemplates{ - Heartbeat: "custom/heartbeat/{org_id}", - Capabilities: "custom/capabilities/{agent_id}", - Inbox: "custom/inbox", - Outbox: "custom/outbox", - } - - assert.Equal(t, "custom/heartbeat/{org_id}", templates.Heartbeat) - assert.Equal(t, "custom/capabilities/{agent_id}", templates.Capabilities) - assert.Equal(t, "custom/inbox", templates.Inbox) - assert.Equal(t, "custom/outbox", templates.Outbox) -} From bfbeff046299f3c81524078f46fb80b679cd2a1a Mon Sep 17 00:00:00 2001 From: James Jeffries Date: Thu, 25 Sep 2025 11:38:14 +0100 Subject: [PATCH 8/8] corrects error message --- agent/configmgr/jwt_claims.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/agent/configmgr/jwt_claims.go b/agent/configmgr/jwt_claims.go index f3da645..5fcf6ed 100644 --- a/agent/configmgr/jwt_claims.go +++ b/agent/configmgr/jwt_claims.go @@ -54,7 +54,7 @@ func parseJWTClaims(tokenString string) (*JWTClaims, error) { if clientID, ok := customClaims["client_id"].(string); ok { jwtClaims.ClientID = clientID } else { - return nil, fmt.Errorf("orb:zone claim not found or not a string in JWT token") + return nil, fmt.Errorf("client_id claim not found or not a string in JWT token") } if agentID, ok := customClaims["orb:agent_id"].(string); ok { jwtClaims.AgentID = agentID