diff --git a/agent/configmgr/fleet.go b/agent/configmgr/fleet.go index be07812..7e24860 100644 --- a/agent/configmgr/fleet.go +++ b/agent/configmgr/fleet.go @@ -43,10 +43,11 @@ type heartbeater struct { heartbeatCtx context.Context } -func (hb *heartbeater) sendSingleHeartbeat(ctx context.Context, publishFunc func(ctx context.Context, payload []byte) error, agentID string, _ time.Time, _ messages.HeartbeatState) { +func (hb *heartbeater) sendSingleHeartbeat(ctx context.Context, publishFunc func(ctx context.Context, payload []byte) error, _ string, _ time.Time, _ messages.HeartbeatState) { hbData := messages.Heartbeat{ - AgentID: agentID, - Version: "1.0.0", + SchemaVersion: messages.CurrentHeartbeatSchemaVersion, + TimeStamp: time.Now().UTC(), + State: 1, } body, err := json.Marshal(hbData) @@ -71,7 +72,7 @@ func (hb *heartbeater) sendHeartbeats(ctx context.Context, _ context.CancelFunc, // (if any) remain accurate. hb.heartbeatCtx = ctx - hb.logger.Debug("start heartbeats routine", slog.Any("routine", ctx.Value("routine"))) + hb.logger.Debug("start heartbeats routine") hb.sendSingleHeartbeat(ctx, publishFunc, agentID, time.Now(), messages.Online) for { @@ -103,17 +104,22 @@ func newFleetConfigManager(ctx context.Context, logger *slog.Logger, pMgr policy } func (fleetManager *fleetConfigManager) Start(cfg config.Config, backends map[string]backend.Backend) error { - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() + ctx := context.Background() + fleetManager.logger.Info("starting fleet config manager", "token_url", cfg.OrbAgent.ConfigManager.Sources.Fleet.TokenURL, "client_id", cfg.OrbAgent.ConfigManager.Sources.Fleet.ClientID, "client_secret", cfg.OrbAgent.ConfigManager.Sources.Fleet.ClientSecret) // call the token url to get the token token, err := fleetManager.getToken(ctx, cfg.OrbAgent.ConfigManager.Sources.Fleet.TokenURL, cfg.OrbAgent.ConfigManager.Sources.Fleet.ClientID, cfg.OrbAgent.ConfigManager.Sources.Fleet.ClientSecret) if err != nil { return err } + jwtClaims, err := parseJWTClaims(token.AccessToken) + if err != nil { + return fmt.Errorf("failed to parse JWT claims: %w", err) + } + // generate topics from JWT claims and config agent_id using hardcoded templates - topics, err := generateTopicsFromTemplate(token.AccessToken, cfg.OrbAgent.ConfigManager.Sources.Fleet.AgentID) + topics, err := generateTopicsFromTemplate(jwtClaims) if err != nil { return fmt.Errorf("failed to generate topics: %w", err) } @@ -125,20 +131,23 @@ func (fleetManager *fleetConfigManager) Start(cfg config.Config, backends map[st "outbox_topic", topics.Outbox) // use MQTT URL from token response or fallback to config - mqttURL := token.MQTTURL + mqttURL := jwtClaims.MqttURL if mqttURL == "" { mqttURL = cfg.OrbAgent.ConfigManager.Sources.Fleet.MQTTURL } + if mqttURL == "" { + return fmt.Errorf("no MQTT URL provided in token response or config") + } // use the generated topics to connect over MQTT v5 - err = fleetManager.connect(mqttURL, token.AccessToken, *topics, backends, cfg.OrbAgent.ConfigManager.Sources.Fleet.AgentID) + err = fleetManager.connect(ctx, mqttURL, token.AccessToken, *topics, backends, cfg.OrbAgent.ConfigManager.Sources.Fleet.ClientID, jwtClaims.Zone) if err != nil { return err } return nil } -func (fleetManager *fleetConfigManager) connectWithContext(ctx context.Context, fleetMQTTURL, token string, topics tokenResponseTopics, backends map[string]backend.Backend, agentID string) error { +func (fleetManager *fleetConfigManager) connect(ctx context.Context, fleetMQTTURL, token string, topics tokenResponseTopics, backends map[string]backend.Backend, clientID, zone string) error { // Parse the ORB URL serverURL, err := url.Parse(fleetMQTTURL) if err != nil { @@ -146,9 +155,6 @@ func (fleetManager *fleetConfigManager) connectWithContext(ctx context.Context, return err } - // Configure autopaho client - clientID := "orb-agent-" + time.Now().Format("20060102150405") - cfg := autopaho.ClientConfig{ ServerUrls: []*url.URL{serverURL}, KeepAlive: 30, @@ -160,7 +166,7 @@ func (fleetManager *fleetConfigManager) connectWithContext(ctx context.Context, OnConnectionUp: func(cm *autopaho.ConnectionManager, _ *paho.Connack) { fleetManager.logger.Info("MQTT connection established", "server", serverURL.String()) - // Subscribe to "mytopic" when connection is established + // //Subscribe to "mytopic" when connection is established // _, err := cm.Subscribe(context.Background(), &paho.Subscribe{ // Subscriptions: []paho.SubscribeOptions{ // {Topic: topics.Inbox, QoS: 1}, @@ -173,11 +179,11 @@ func (fleetManager *fleetConfigManager) connectWithContext(ctx context.Context, // } // start heartbeat loop bound to the same connection-level context - fleetManager.heartbeater.sendHeartbeats(ctx, func() {}, func(ctx context.Context, payload []byte) error { + go fleetManager.heartbeater.sendHeartbeats(ctx, func() {}, func(ctx context.Context, payload []byte) error { publishResponse, err := cm.Publish(ctx, &paho.Publish{ Topic: topics.Heartbeat, Payload: payload, - QoS: 1, + QoS: 0, Retain: false, }) if err != nil { @@ -194,7 +200,7 @@ func (fleetManager *fleetConfigManager) connectWithContext(ctx context.Context, "payload", string(payload), ) return nil - }, agentID) + }, clientID) go fleetManager.sendCapabilities(ctx, backends, func(ctx context.Context, payload []byte) error { _, err := cm.Publish(ctx, &paho.Publish{ @@ -234,12 +240,9 @@ func (fleetManager *fleetConfigManager) connectWithContext(ctx context.Context, // Set authentication if token is provided if token != "" { - cfg.ConnectUsername = "token" // Using token as username, adjust as needed for your auth scheme + fleetManager.logger.Info("setting MQTT authentication", "client_id", clientID, "zone", zone) + cfg.ConnectUsername = fmt.Sprintf("%s:%s", zone, clientID) cfg.ConnectPassword = []byte(token) - } else { - // TODO: remove these temporary credentials - cfg.ConnectUsername = "admin" - cfg.ConnectPassword = []byte("admin") } // Create and start the connection manager using the long-lived context. @@ -264,12 +267,6 @@ func (fleetManager *fleetConfigManager) connectWithContext(ctx context.Context, return nil } -// connect is a backward-compatibility shim that invokes connectWithContext with -// the fleet manager's root context. -func (fleetManager *fleetConfigManager) connect(fleetMQTTURL, token string, topics tokenResponseTopics, backends map[string]backend.Backend, agentID string) error { - return fleetManager.connectWithContext(fleetManager.heartbeater.heartbeatCtx, fleetMQTTURL, token, topics, backends, agentID) -} - func (fleetManager *fleetConfigManager) sendCapabilities(ctx context.Context, backends map[string]backend.Backend, publishFunc func(ctx context.Context, payload []byte) error) { capabilities := messages.Capabilities{ SchemaVersion: messages.CurrentCapabilitiesSchemaVersion, @@ -348,7 +345,8 @@ func (fleetManager *fleetConfigManager) getToken(ctx context.Context, tokenURL s fleetManager.logger.Debug("requesting access token", "token_url", tokenURL, "client_id", clientID) scopes := []string{ - "orb.mqtt", + "orb.mqtt:agent", + "orb.mqtt:group", } data := url.Values{} @@ -356,6 +354,7 @@ func (fleetManager *fleetConfigManager) getToken(ctx context.Context, tokenURL s data.Set("scope", strings.Join(scopes, " ")) data.Set("client_id", clientID) data.Set("client_secret", clientSecret) + data.Set("audience", "orb") fleetManager.logger.Debug("sending token request", "url", tokenURL, "data", data, "client_id", clientID) //, "client_secret", clientSecret) diff --git a/agent/configmgr/fleet_test.go b/agent/configmgr/fleet_test.go index c312c4f..e85d290 100644 --- a/agent/configmgr/fleet_test.go +++ b/agent/configmgr/fleet_test.go @@ -5,7 +5,6 @@ import ( "encoding/base64" "encoding/json" "errors" - "fmt" "log/slog" "net/http" "net/http/httptest" @@ -105,21 +104,23 @@ func TestHeartbeater_SendSingleHeartbeat_Success(t *testing.T) { ctx := context.Background() testTime := time.Now() - // Expected heartbeat data - expectedHeartbeat := messages.Heartbeat{ - AgentID: "test-agent-id", - Version: "1.0.0", - } - expectedPayload, _ := json.Marshal(expectedHeartbeat) - - // Set up mock expectations - mockPublish.On("Publish", ctx, expectedPayload).Return(nil) + // We don't assert exact bytes; validate the marshalled heartbeat content + mockPublish.On("Publish", ctx, mock.AnythingOfType("[]uint8")).Return(nil) // Act hb.sendSingleHeartbeat(ctx, mockPublish.Publish, "test-agent-id", testTime, messages.Online) - // Assert - mockPublish.AssertExpectations(t) + // Assert: ensure one publish happened with a valid heartbeat payload + calls := mockPublish.Calls + require.Len(t, calls, 1) + payload, ok := calls[0].Arguments.Get(1).([]byte) + require.True(t, ok) + + var hbMsg messages.Heartbeat + require.NoError(t, json.Unmarshal(payload, &hbMsg)) + assert.Equal(t, messages.CurrentHeartbeatSchemaVersion, hbMsg.SchemaVersion) + assert.Equal(t, messages.State(1), hbMsg.State) + assert.False(t, hbMsg.TimeStamp.IsZero()) } func TestHeartbeater_SendSingleHeartbeat_PublishError(t *testing.T) { @@ -166,8 +167,9 @@ func TestHeartbeater_SendSingleHeartbeat_HeartbeatContent(t *testing.T) { err := json.Unmarshal(capturedPayload, &heartbeat) require.NoError(t, err) - assert.Equal(t, "test-agent-id", heartbeat.AgentID) - assert.Equal(t, "1.0.0", heartbeat.Version) + assert.Equal(t, messages.CurrentHeartbeatSchemaVersion, heartbeat.SchemaVersion) + assert.Equal(t, messages.State(1), heartbeat.State) + assert.False(t, heartbeat.TimeStamp.IsZero()) } func TestHeartbeater_SendHeartbeats_InitialHeartbeat(t *testing.T) { @@ -379,13 +381,15 @@ func TestHeartbeater_SendHeartbeats_HeartbeatStates(t *testing.T) { assert.GreaterOrEqual(t, len(payloadsCopy), 2, "Should have at least initial and final heartbeats") - // Verify all payloads are valid heartbeat messages + // Verify all payloads are valid heartbeat messages and contain expected fields for i, payload := range payloadsCopy { var heartbeat messages.Heartbeat err := json.Unmarshal(payload, &heartbeat) require.NoError(t, err, "Heartbeat %d should be valid JSON", i) - assert.Equal(t, "test-agent-id", heartbeat.AgentID) - assert.Equal(t, "1.0.0", heartbeat.Version) + assert.Equal(t, messages.CurrentHeartbeatSchemaVersion, heartbeat.SchemaVersion) + assert.False(t, heartbeat.TimeStamp.IsZero()) + // Current implementation always sends Online state (1) + assert.Equal(t, messages.State(1), heartbeat.State) } } @@ -426,7 +430,7 @@ func TestFleetConfigManager_GetToken_Success(t *testing.T) { err := r.ParseForm() assert.NoError(t, err) assert.Equal(t, "client_credentials", r.Form.Get("grant_type")) - assert.Contains(t, r.Form.Get("scope"), "orb.mqtt") + assert.Contains(t, r.Form.Get("scope"), "orb.mqtt:agent") // Return valid token response (no longer includes topics) response := tokenResponse{ @@ -544,7 +548,7 @@ func TestFleetConfigManager_Connect_InvalidURL(t *testing.T) { // Act with invalid URL backends := make(map[string]backend.Backend) trt := tokenResponseTopics{Inbox: "test/topic"} - err := fleetManager.connect("://invalid-url", "test_token", trt, backends, "test-agent-id") + err := fleetManager.connect(context.Background(), "://invalid-url", "test_token", trt, backends, "test-agent-id", "test-zone") // Assert assert.Error(t, err) @@ -562,7 +566,10 @@ func TestFleetConfigManager_Connect_ValidURL(t *testing.T) { // since we don't have a real MQTT server backends := make(map[string]backend.Backend) trt2 := tokenResponseTopics{Inbox: "test/topic"} - err := fleetManager.connect("mqtt://localhost:1883", "test_token", trt2, backends, "test-agent-id") + // Timeout after 3 seconds + ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) + defer cancel() + err := fleetManager.connect(ctx, "mqtt://localhost:1883", "test_token", trt2, backends, "test-agent-id", "test-zone") // Assert - we expect connection to fail since no server is running, // but URL parsing should succeed @@ -618,9 +625,8 @@ func TestFleetConfigManager_Start_ConnectError(t *testing.T) { server := httptest.NewTLSServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) { response := tokenResponse{ AccessToken: rawJWTWithClaims(map[string]any{ - "org_id": "test-org", - "agent_id": "test-agent-123", - "iat": 1516239022, + "orb:org_id": "test-org", + "iat": 1516239022, }), MQTTURL: "://invalid-mqtt-url", // Invalid MQTT URL ExpiresIn: 3600, @@ -716,94 +722,6 @@ func TestTokenResponse_Marshaling(t *testing.T) { assert.Equal(t, original.ExpiresIn, unmarshaled.ExpiresIn) } -func TestTokenResponseTopics_Marshaling(t *testing.T) { - // Arrange - original := tokenResponseTopics{ - Inbox: "custom/inbox/topic", - Outbox: "custom/outbox/topic", - } - - // Act - jsonData, err := json.Marshal(original) - require.NoError(t, err) - - var unmarshaled tokenResponseTopics - err = json.Unmarshal(jsonData, &unmarshaled) - require.NoError(t, err) - - // Assert - assert.Equal(t, original.Inbox, unmarshaled.Inbox) - assert.Equal(t, original.Outbox, unmarshaled.Outbox) -} - -func TestFleetConfigManager_Integration_SuccessFlow(t *testing.T) { - // This test verifies the happy path integration but without actual MQTT connection - // due to complexity of mocking MQTT server - - // Arrange - logger := slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelError})) - mockPMgr := &mockPolicyManagerForFleet{} - fleetManager := newFleetConfigManager(context.Background(), logger, mockPMgr) - defer fleetManager.heartbeater.hbTicker.Stop() - - // Create mock HTTP server for successful token response - server := httptest.NewTLSServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) { - response := tokenResponse{ - AccessToken: rawJWTWithClaims(map[string]any{ - "org_id": "integration-org", - "agent_id": "test-agent-123", - "iat": 1516239022, - }), - MQTTURL: "mqtt://localhost:1883", // Valid but non-existent - ExpiresIn: 3600, - } - _ = json.NewEncoder(w).Encode(response) - })) - defer server.Close() - - // Act - test token retrieval part of the flow - ctx := context.Background() - token, err := fleetManager.getToken(ctx, server.URL, "integration_client", "integration_secret") - - // Assert - token retrieval should succeed - require.NoError(t, err) - expectedJWT := rawJWTWithClaims(map[string]any{ - "org_id": "integration-org", - "agent_id": "test-agent-123", - "iat": 1516239022, - }) - assert.Equal(t, expectedJWT, token.AccessToken) - assert.Equal(t, "mqtt://localhost:1883", token.MQTTURL) - - // Test that topic generation works with the JWT - topics, err := generateTopicsFromTemplate(token.AccessToken, "test-agent-123") - require.NoError(t, err) - assert.Equal(t, "/orgs/integration-org/agents/test-agent-123/inbox", topics.Inbox) - assert.Equal(t, "/orgs/integration-org/agents/test-agent-123/outbox", topics.Outbox) - - // Note: We don't test the full Start() method here because it would require - // a real MQTT broker, but we've verified the token retrieval and topic generation works -} - -func TestFleetConfigManager_CompilerInterfaceCheck(t *testing.T) { - // This test ensures that fleetConfigManager implements the Manager interface - // The actual check is done at compile time with: var _ Manager = (*fleetConfigManager)(nil) - - logger := slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelError})) - mockPMgr := &mockPolicyManagerForFleet{} - fleetManager := newFleetConfigManager(context.Background(), logger, mockPMgr) - defer fleetManager.heartbeater.hbTicker.Stop() - - // Verify that fleetManager can be assigned to Manager interface - var manager Manager = fleetManager - assert.NotNil(t, manager) - - // Verify that all Manager interface methods are available - ctx := context.Background() - resultCtx := manager.GetContext(ctx) - assert.NotNil(t, resultCtx) -} - // Test edge cases for heartbeater ticker cleanup func TestFleetConfigManager_HeartbeaterTickerCleanup(t *testing.T) { // Arrange @@ -823,27 +741,6 @@ func TestFleetConfigManager_HeartbeaterTickerCleanup(t *testing.T) { assert.True(t, true, "Ticker cleanup should not cause issues") } -// Test multiple fleetConfigManager instances -func TestFleetConfigManager_MultipleInstances(t *testing.T) { - // Arrange - logger := slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelError})) - mockPMgr1 := &mockPolicyManagerForFleet{} - mockPMgr2 := &mockPolicyManagerForFleet{} - - // Act - manager1 := newFleetConfigManager(context.Background(), logger, mockPMgr1) - manager2 := newFleetConfigManager(context.Background(), logger, mockPMgr2) - - // Assert - assert.NotEqual(t, manager1, manager2, "Different instances should be created") - assert.NotEqual(t, manager1.heartbeater, manager2.heartbeater, "Each should have separate heartbeater") - assert.NotEqual(t, manager1.heartbeater.hbTicker, manager2.heartbeater.hbTicker, "Each should have separate ticker") - - // Cleanup - manager1.heartbeater.hbTicker.Stop() - manager2.heartbeater.hbTicker.Stop() -} - // mockBackend implements the Backend interface for testing type mockBackend struct { mock.Mock @@ -1242,11 +1139,17 @@ func TestFleetConfigManager_SendCapabilities_CapabilitiesStructure(t *testing.T) func TestFleetConfigManager_Start_WithJWTTopicGeneration(t *testing.T) { // This test verifies that the Start method correctly generates topics from JWT claims - // Create a valid JWT token with org_id and agent_id claims + mqttURL := "mqtt://test.example.com:1883" + // Create a valid JWT token with orb-prefixed claims used by parseJWTClaims validJWT := rawJWTWithClaims(map[string]any{ - "org_id": "integration-org", - "agent_id": "test-agent-123", - "iat": 1516239022, + "orb:org_id": "integration-org", + "orb:zone": "default", + "orb:agent_id": "test-agent", + "client_id": "test-client", + "iat": 1516239022, + "ext": map[string]any{ + "orb:mqtt_url": mqttURL, + }, }) logger := slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelDebug})) @@ -1258,7 +1161,7 @@ func TestFleetConfigManager_Start_WithJWTTopicGeneration(t *testing.T) { server := httptest.NewTLSServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) { response := tokenResponse{ AccessToken: validJWT, - MQTTURL: "mqtt://test.example.com:1883", + MQTTURL: mqttURL, ExpiresIn: 3600, } @@ -1305,50 +1208,3 @@ func TestFleetConfigManager_Start_WithJWTTopicGeneration(t *testing.T) { strings.Contains(errorMsg, "deadline"), "Expected connection-related error, got: %s", err.Error()) } - -func TestGenerateTopicsFromTemplate_Integration(t *testing.T) { - // Test the integration of JWT parsing with topic generation using real JWT - tests := []struct { - name string - jwt string - expectedOrg string - expectedAgent string - }{ - { - name: "production-like JWT", - jwt: rawJWTWithClaims(map[string]any{ - "org_id": "acme-corp", - "agent_id": "agent-prod-456", - "iat": 1516239022, - }), - expectedOrg: "acme-corp", - expectedAgent: "agent-prod-456", - }, - { - name: "development JWT with different values", - jwt: rawJWTWithClaims(map[string]any{ - "org_id": "dev-123", - "agent_id": "dev-agent-789", - "iat": 1516239022, - }), - expectedOrg: "dev-123", - expectedAgent: "dev-agent-789", - }, - } - - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - topics, err := generateTopicsFromTemplate(tt.jwt, tt.expectedAgent) - require.NoError(t, err) - - expectedTopics := &tokenResponseTopics{ - Heartbeat: fmt.Sprintf("/orgs/%s/agents/%s/heartbeat", tt.expectedOrg, tt.expectedAgent), - Capabilities: fmt.Sprintf("/orgs/%s/agents/%s/capabilities", tt.expectedOrg, tt.expectedAgent), - Inbox: fmt.Sprintf("/orgs/%s/agents/%s/inbox", tt.expectedOrg, tt.expectedAgent), - Outbox: fmt.Sprintf("/orgs/%s/agents/%s/outbox", tt.expectedOrg, tt.expectedAgent), - } - - assert.Equal(t, expectedTopics, topics) - }) - } -} diff --git a/agent/configmgr/jwt_claims.go b/agent/configmgr/jwt_claims.go index 5a0a2b0..5fcf6ed 100644 --- a/agent/configmgr/jwt_claims.go +++ b/agent/configmgr/jwt_claims.go @@ -10,31 +10,11 @@ import ( // JWTClaims represents the JWT claims we extract for topic templating type JWTClaims struct { - OrgID string `json:"org_id"` -} - -// TopicClaims combines org_id from JWT with agent_id from config -type TopicClaims struct { - OrgID string - AgentID string -} - -// TopicTemplates defines hardcoded topic name patterns with placeholders -type TopicTemplates struct { - Heartbeat string - Capabilities string - Inbox string - Outbox string -} - -// DefaultTopicTemplates returns the hardcoded topic templates -func DefaultTopicTemplates() TopicTemplates { - return TopicTemplates{ - Heartbeat: "/orgs/{org_id}/agents/{agent_id}/heartbeat", - Capabilities: "/orgs/{org_id}/agents/{agent_id}/capabilities", - Inbox: "/orgs/{org_id}/agents/{agent_id}/inbox", - Outbox: "/orgs/{org_id}/agents/{agent_id}/outbox", - } + AgentID string + OrgID string + Zone string + ClientID string + MqttURL string } // parseJWTClaims extracts org_id claim from a JWT token @@ -51,7 +31,7 @@ func parseJWTClaims(tokenString string) (*JWTClaims, error) { } var claims jwt.Claims - var customClaims map[string]interface{} + var customClaims map[string]any // Extract both standard and custom claims without verification if err := token.UnsafeClaimsWithoutVerification(&claims, &customClaims); err != nil { @@ -61,43 +41,58 @@ func parseJWTClaims(tokenString string) (*JWTClaims, error) { // Extract org_id from custom claims jwtClaims := &JWTClaims{} - if orgID, ok := customClaims["org_id"].(string); ok { + if orgID, ok := customClaims["orb:org_id"].(string); ok { jwtClaims.OrgID = orgID } else { - return nil, fmt.Errorf("org_id claim not found or not a string in JWT token") + return nil, fmt.Errorf("orb:org_id claim not found or not a string in JWT token") + } + if zone, ok := customClaims["orb:zone"].(string); ok { + jwtClaims.Zone = zone + } else { + return nil, fmt.Errorf("orb:zone claim not found or not a string in JWT token") + } + if clientID, ok := customClaims["client_id"].(string); ok { + jwtClaims.ClientID = clientID + } else { + return nil, fmt.Errorf("client_id claim not found or not a string in JWT token") + } + if agentID, ok := customClaims["orb:agent_id"].(string); ok { + jwtClaims.AgentID = agentID + } else { + return nil, fmt.Errorf("orb:agent_id claim not found or not a string in JWT token") + } + + if extClaims, ok := customClaims["ext"].(map[string]any); ok { + if mqttURL, ok := extClaims["orb:mqtt_url"].(string); ok { + jwtClaims.MqttURL = mqttURL + } } return jwtClaims, nil } // fillTopicTemplate replaces placeholders in a topic template with actual values -func fillTopicTemplate(template string, claims *TopicClaims) string { +func fillTopicTemplate(template string, claims *JWTClaims) string { result := template result = strings.ReplaceAll(result, "{org_id}", claims.OrgID) result = strings.ReplaceAll(result, "{agent_id}", claims.AgentID) return result } -// generateTopicsFromTemplate creates actual topic names from templates using JWT claims and config agent_id -func generateTopicsFromTemplate(tokenString string, agentID string) (*tokenResponseTopics, error) { - jwtClaims, err := parseJWTClaims(tokenString) - if err != nil { - return nil, fmt.Errorf("failed to parse JWT claims: %w", err) - } - - // Combine JWT org_id with config agent_id - topicClaims := &TopicClaims{ - OrgID: jwtClaims.OrgID, - AgentID: agentID, - } - - templates := DefaultTopicTemplates() +const ( + heartbeatTemplate = "orgs/{org_id}/agents/{agent_id}/heartbeats" + capabilitiesTemplate = "orgs/{org_id}/agents/{agent_id}/capabilities" + inboxTemplate = "orgs/{org_id}/agents/{agent_id}/inbox" + outboxTemplate = "orgs/{org_id}/agents/{agent_id}/outbox" +) +// generateTopicsFromTemplate creates actual topic names from templates using JWT claims and config agent_id +func generateTopicsFromTemplate(jwtClaims *JWTClaims) (*tokenResponseTopics, error) { topics := &tokenResponseTopics{ - Heartbeat: fillTopicTemplate(templates.Heartbeat, topicClaims), - Capabilities: fillTopicTemplate(templates.Capabilities, topicClaims), - Inbox: fillTopicTemplate(templates.Inbox, topicClaims), - Outbox: fillTopicTemplate(templates.Outbox, topicClaims), + Heartbeat: fillTopicTemplate(heartbeatTemplate, jwtClaims), + Capabilities: fillTopicTemplate(capabilitiesTemplate, jwtClaims), + Inbox: fillTopicTemplate(inboxTemplate, jwtClaims), + Outbox: fillTopicTemplate(outboxTemplate, jwtClaims), } return topics, nil diff --git a/agent/configmgr/jwt_claims_test.go b/agent/configmgr/jwt_claims_test.go index 463c5fc..e3d4ddb 100644 --- a/agent/configmgr/jwt_claims_test.go +++ b/agent/configmgr/jwt_claims_test.go @@ -7,26 +7,17 @@ import ( "github.com/stretchr/testify/require" ) -func TestDefaultTopicTemplates(t *testing.T) { - templates := DefaultTopicTemplates() - - assert.Equal(t, "/orgs/{org_id}/agents/{agent_id}/heartbeat", templates.Heartbeat) - assert.Equal(t, "/orgs/{org_id}/agents/{agent_id}/capabilities", templates.Capabilities) - assert.Equal(t, "/orgs/{org_id}/agents/{agent_id}/inbox", templates.Inbox) - assert.Equal(t, "/orgs/{org_id}/agents/{agent_id}/outbox", templates.Outbox) -} - func TestFillTopicTemplate(t *testing.T) { tests := []struct { name string template string - claims *TopicClaims + claims *JWTClaims expected string }{ { name: "basic substitution", template: "/orgs/{org_id}/agents/{agent_id}/test", - claims: &TopicClaims{ + claims: &JWTClaims{ OrgID: "org123", AgentID: "agent-456", }, @@ -35,7 +26,7 @@ func TestFillTopicTemplate(t *testing.T) { { name: "multiple occurrences", template: "{org_id}/data/{org_id}/{agent_id}", - claims: &TopicClaims{ + claims: &JWTClaims{ OrgID: "company1", AgentID: "agent-789", }, @@ -44,7 +35,7 @@ func TestFillTopicTemplate(t *testing.T) { { name: "no placeholders", template: "static/topic/name", - claims: &TopicClaims{ + claims: &JWTClaims{ OrgID: "org123", AgentID: "agent-456", }, @@ -53,7 +44,7 @@ func TestFillTopicTemplate(t *testing.T) { { name: "empty claims", template: "/orgs/{org_id}/agents/{agent_id}/test", - claims: &TopicClaims{ + claims: &JWTClaims{ OrgID: "", AgentID: "", }, @@ -88,30 +79,65 @@ func TestParseJWTClaims(t *testing.T) { }, { name: "valid JWT with org_id", - // This is a sample JWT with the claims we need (created using jwt.io) - // Header: {"alg":"HS256","typ":"JWT"} - // Payload: {"org_id":"test-org","agent_id":"test-agent-123","iat":1516239022} - // Secret: "your-256-bit-secret" - tokenString: "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJvcmdfaWQiOiJ0ZXN0LW9yZyIsImFnZW50X2lkIjoidGVzdC1hZ2VudC0xMjMiLCJpYXQiOjE1MTYyMzkwMjJ9.8U8W8j_2ZwV2GvO3QcI6LJl2a8XGrHPDYS9hM2y4k2I", + tokenString: rawJWTWithClaims(map[string]any{ + "orb:org_id": "test-org", + "orb:zone": "default", + "orb:agent_id": "test-agent", + "client_id": "test-client", + "iat": 1516239022, + "ext": map[string]any{ + "orb:mqtt_url": "mqtt://test.example.com:1883", + }, + }), expected: &JWTClaims{ - OrgID: "test-org", + OrgID: "test-org", + Zone: "default", + ClientID: "test-client", + AgentID: "test-agent", + MqttURL: "mqtt://test.example.com:1883", }, }, { name: "JWT missing org_id", - // Header: {"alg":"HS256","typ":"JWT"} - // Payload: {"agent_id":"test-agent-123","iat":1516239022} - // Secret: "your-256-bit-secret" - tokenString: "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJhZ2VudF9pZCI6InRlc3QtYWdlbnQtMTIzIiwiaWF0IjoxNTE2MjM5MDIyfQ.QZ3K8j9QqgV2HvP4RdJ7MKm3b9YHsIPEZT0iN3z5l3J", - expectedErr: "org_id claim not found", + tokenString: rawJWTWithClaims(map[string]any{ + "client_id": "test-client", + "iat": 1516239022, + "orb:agent_id": "test-agent", + "orb:zone": "default", + }), + expectedErr: "orb:org_id claim not found", }, { name: "JWT with non-string org_id", - // Header: {"alg":"HS256","typ":"JWT"} - // Payload: {"org_id":123,"agent_id":"test-agent-123","iat":1516239022} - // Secret: "your-256-bit-secret" - tokenString: "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJvcmdfaWQiOjEyMywiYWdlbnRfaWQiOiJ0ZXN0LWFnZW50LTEyMyIsImlhdCI6MTUxNjIzOTAyMn0.Vh9X8l2LwO6PvR5QfK9NMn4c0ZIsJGF1aU3oO4z6m5K", - expectedErr: "org_id claim not found or not a string", + tokenString: rawJWTWithClaims(map[string]any{ + "orb:org_id": 123, + "orb:zone": "default", + "client_id": "test-client", + "iat": 1516239022, + "orb:agent_id": "test-agent", + }), + expectedErr: "orb:org_id claim not found or not a string", + }, + { + name: "JWT missing agent_id", + tokenString: rawJWTWithClaims(map[string]any{ + "client_id": "test-client", + "iat": 1516239022, + "orb:org_id": "test-org", + "orb:zone": "default", + }), + expectedErr: "orb:agent_id claim not found", + }, + { + name: "JWT with non-string agent_id", + tokenString: rawJWTWithClaims(map[string]any{ + "orb:agent_id": 123, + "orb:zone": "default", + "client_id": "test-client", + "iat": 1516239022, + "orb:org_id": "test-org", + }), + expectedErr: "orb:agent_id claim not found or not a string", }, } @@ -133,82 +159,41 @@ func TestParseJWTClaims(t *testing.T) { func TestGenerateTopicsFromTemplate(t *testing.T) { tests := []struct { - name string - tokenString string - expectedErr string - expected *tokenResponseTopics + name string + orgID string + agentID string + expected *tokenResponseTopics }{ { - name: "empty token", - tokenString: "", - expectedErr: "failed to parse JWT claims", - }, - { - name: "invalid token", - tokenString: "invalid.token", - expectedErr: "failed to parse JWT claims", - }, - { - name: "valid token generates correct topics", - // Valid JWT with org_id="test-org" and agent_id="test-agent-123" - tokenString: "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJvcmdfaWQiOiJ0ZXN0LW9yZyIsImFnZW50X2lkIjoidGVzdC1hZ2VudC0xMjMiLCJpYXQiOjE1MTYyMzkwMjJ9.8U8W8j_2ZwV2GvO3QcI6LJl2a8XGrHPDYS9hM2y4k2I", + name: "valid token generates correct topics", + orgID: "test-org", + agentID: "test-client-123", expected: &tokenResponseTopics{ - Heartbeat: "/orgs/test-org/agents/test-agent-123/heartbeat", - Capabilities: "/orgs/test-org/agents/test-agent-123/capabilities", - Inbox: "/orgs/test-org/agents/test-agent-123/inbox", - Outbox: "/orgs/test-org/agents/test-agent-123/outbox", + Heartbeat: "orgs/test-org/agents/test-client-123/heartbeats", + Capabilities: "orgs/test-org/agents/test-client-123/capabilities", + Inbox: "orgs/test-org/agents/test-client-123/inbox", + Outbox: "orgs/test-org/agents/test-client-123/outbox", }, }, { - name: "different org and agent values", - // JWT with org_id="prod-company" and agent_id="agent-456" - // Header: {"alg":"HS256","typ":"JWT"} - // Payload: {"org_id":"prod-company","agent_id":"agent-456","iat":1516239022} - // Secret: "your-256-bit-secret" - tokenString: "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJvcmdfaWQiOiJwcm9kLWNvbXBhbnkiLCJhZ2VudF9pZCI6ImFnZW50LTQ1NiIsImlhdCI6MTUxNjIzOTAyMn0.KJ2NtX5zY8R9LwP3QfM6NOn5d1aJsHGE2bV4oQ6z7mL", + name: "different org and agent values", + orgID: "prod-company", + agentID: "test-agent-123", expected: &tokenResponseTopics{ - Heartbeat: "/orgs/prod-company/agents/test-agent-123/heartbeat", - Capabilities: "/orgs/prod-company/agents/test-agent-123/capabilities", - Inbox: "/orgs/prod-company/agents/test-agent-123/inbox", - Outbox: "/orgs/prod-company/agents/test-agent-123/outbox", + Heartbeat: "orgs/prod-company/agents/test-agent-123/heartbeats", + Capabilities: "orgs/prod-company/agents/test-agent-123/capabilities", + Inbox: "orgs/prod-company/agents/test-agent-123/inbox", + Outbox: "orgs/prod-company/agents/test-agent-123/outbox", }, }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - topics, err := generateTopicsFromTemplate(tt.tokenString, "test-agent-123") + topics, err := generateTopicsFromTemplate(&JWTClaims{OrgID: tt.orgID, AgentID: tt.agentID}) - if tt.expectedErr != "" { - require.Error(t, err) - assert.Contains(t, err.Error(), tt.expectedErr) - assert.Nil(t, topics) - } else { - require.NoError(t, err) - assert.Equal(t, tt.expected, topics) - } + require.NoError(t, err) + assert.Equal(t, tt.expected, topics) }) } } - -func TestJWTClaimsStruct(t *testing.T) { - claims := JWTClaims{ - OrgID: "test-org-id", - } - - assert.Equal(t, "test-org-id", claims.OrgID) -} - -func TestTopicTemplatesStruct(t *testing.T) { - templates := TopicTemplates{ - Heartbeat: "custom/heartbeat/{org_id}", - Capabilities: "custom/capabilities/{agent_id}", - Inbox: "custom/inbox", - Outbox: "custom/outbox", - } - - assert.Equal(t, "custom/heartbeat/{org_id}", templates.Heartbeat) - assert.Equal(t, "custom/capabilities/{agent_id}", templates.Capabilities) - assert.Equal(t, "custom/inbox", templates.Inbox) - assert.Equal(t, "custom/outbox", templates.Outbox) -} diff --git a/agent/configmgr/messages/fleet_messages.go b/agent/configmgr/messages/fleet_messages.go index 992b993..62be59f 100644 --- a/agent/configmgr/messages/fleet_messages.go +++ b/agent/configmgr/messages/fleet_messages.go @@ -1,9 +1,51 @@ package messages +import "time" + +// CurrentHeartbeatSchemaVersion defines the current version of the heartbeat schema +const CurrentHeartbeatSchemaVersion = "1.0" + // Heartbeat represents a periodic message sent by an agent to indicate it's alive and active + +// State represents the current state of an agent in the system +type State int + +// BackendStateInfo contains state information for a backend +type BackendStateInfo struct { + State string `json:"state"` + Error string `json:"error,omitempty"` + RestartCount int64 `json:"restart_count,omitempty"` + LastError string `json:"last_error,omitempty"` + LastRestartTS time.Time `json:"last_restart_ts,omitempty"` + LastRestartReason string `json:"last_restart_reason,omitempty"` +} + +// PolicyStateInfo contains state information for a policy +type PolicyStateInfo struct { + Name string `json:"name"` + Datasets []string `json:"datasets,omitempty"` + State string `json:"state"` + Error string `json:"error,omitempty"` + Version int32 `json:"version"` + LastScrapeBytes int64 `json:"last_scrape_bytes,omitempty"` + LastScrapeTS time.Time `json:"last_scrape_ts,omitempty"` + Backend string `json:"backend,omitempty"` +} + +// GroupStateInfo contains state information for a group +type GroupStateInfo struct { + GroupName string `json:"name"` + GroupChannel string `json:"channel"` +} + +// Heartbeat represents an agent heartbeat message type Heartbeat struct { - AgentID string `json:"agent_id"` - Version string `json:"version"` + SchemaVersion string `json:"schema_version"` + TimeStamp time.Time `json:"ts"` + State State `json:"state"` + BackendState map[string]BackendStateInfo `json:"backend_state"` + PolicyState map[string]PolicyStateInfo `json:"policy_state"` + GroupState map[string]GroupStateInfo `json:"group_state"` } // HeartbeatState represents the current state of an agent in the system @@ -31,8 +73,8 @@ type OrbAgentInfo struct { // BackendInfo contains version and configuration data for a specific backend type BackendInfo struct { - Version string `json:"version"` - Data map[string]interface{} `json:"data"` + Version string `json:"version"` + Data map[string]any `json:"data"` } // CurrentCapabilitiesSchemaVersion defines the current version of the capabilities schema