From 1fa0881f83e0332043954276baeb553e85744226 Mon Sep 17 00:00:00 2001 From: James Jeffries Date: Fri, 21 Nov 2025 16:53:19 +0000 Subject: [PATCH 1/6] configure to use otlp bridge --- agent/agent.go | 30 ++++++++++++++++++++++++++++++ 1 file changed, 30 insertions(+) diff --git a/agent/agent.go b/agent/agent.go index 2eaa15c..3343213 100644 --- a/agent/agent.go +++ b/agent/agent.go @@ -197,6 +197,36 @@ func (a *orbAgent) Start(ctx context.Context, cancelFunc context.CancelFunc) err return err } + if a.config.OrbAgent.ConfigManager.Active == "fleet" { + if commonBackend, exists := a.config.OrbAgent.Backends["common"]; exists { + if commonMap, ok := commonBackend.(map[string]any); ok { + if otlpSection, ok := commonMap["otlp"].(map[string]any); ok { + grpcURL, _ := otlpSection["grpc"].(string) + if grpcURL != "" { + a.logger.Warn("Overriding OTLP gRPC URL for fleet config manager", "url", grpcURL) + } + otlpSection["grpc"] = "localhost:4317" + a.logger.Info("auto-configured OTLP gRPC URL for fleet config manager", "url", "localhost:4317") + + } else { + // otlp section doesn't exist, create it + commonMap["otlp"] = map[string]any{ + "grpc": "localhost:4317", + } + a.logger.Info("auto-configured OTLP gRPC URL for fleet config manager", "url", "localhost:4317") + } + } + } else { + // common backend doesn't exist, create it with otlp config + a.config.OrbAgent.Backends["common"] = map[string]any{ + "otlp": map[string]any{ + "grpc": "localhost:4317", + }, + } + a.logger.Info("auto-configured OTLP gRPC URL for fleet config manager", "url", "localhost:4317") + } + } + if err = a.startBackends(agentCtx, a.config.OrbAgent.Backends, a.config.OrbAgent.Labels); err != nil { return err } From 99eb075bb974c1f83fe5c4b70a79eb03d03c1824 Mon Sep 17 00:00:00 2001 From: James Jeffries Date: Fri, 21 Nov 2025 16:58:52 +0000 Subject: [PATCH 2/6] adds tests --- agent/agent_test.go | 199 ++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 199 insertions(+) diff --git a/agent/agent_test.go b/agent/agent_test.go index 0b216c2..28ce66c 100644 --- a/agent/agent_test.go +++ b/agent/agent_test.go @@ -7,10 +7,12 @@ import ( "testing" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" "github.com/netboxlabs/orb-agent/agent/backend" "github.com/netboxlabs/orb-agent/agent/config" "github.com/netboxlabs/orb-agent/agent/configmgr" + "github.com/netboxlabs/orb-agent/agent/policies" ) // mockConfigManager implements configmgr.Manager for testing Stop delegation @@ -40,3 +42,200 @@ func TestAgentStop_DelegatesToConfigManagerStop(t *testing.T) { assert.True(t, mockMgr.stopCalled, "expected configManager.Stop to be called") } + +// mockPolicyManager implements policymgr.PolicyManager for testing +type mockPolicyManager struct { + repo policies.PolicyRepo +} + +func (m *mockPolicyManager) ManagePolicy(_ config.PolicyPayload) {} +func (m *mockPolicyManager) RemovePolicyDataset(_ string, _ string, _ backend.Backend) {} +func (m *mockPolicyManager) GetPolicyState() ([]policies.PolicyData, error) { + return nil, nil +} +func (m *mockPolicyManager) GetRepo() policies.PolicyRepo { + return m.repo +} +func (m *mockPolicyManager) ApplyBackendPolicies(_ backend.Backend) error { + return nil +} +func (m *mockPolicyManager) RemoveBackendPolicies(_ backend.Backend, _ bool) error { + return nil +} +func (m *mockPolicyManager) RemovePolicy(_ string, _ string, _ string) error { + return nil +} + +// mockSecretsManager implements secretsmgr.Manager for testing +type mockSecretsManager struct{} + +func (m *mockSecretsManager) Start(_ context.Context) error { + return nil +} +func (m *mockSecretsManager) RegisterUpdatePoliciesCallback(_ func(map[string]bool)) {} +func (m *mockSecretsManager) SolvePolicySecrets(payload config.PolicyPayload) (config.PolicyPayload, error) { + return payload, nil +} +func (m *mockSecretsManager) SolveConfigSecrets(backends map[string]any, configManager config.ManagerConfig) (map[string]any, config.ManagerConfig, error) { + return backends, configManager, nil +} + +func TestStart_FleetConfig_OverridesExistingOTLPGrpcURL(t *testing.T) { + logger := slog.New(slog.NewTextHandler(os.Stderr, nil)) + repo, err := policies.NewMemRepo() + require.NoError(t, err) + + cfg := config.Config{ + OrbAgent: config.OrbAgent{ + Backends: map[string]any{ + "common": map[string]any{ + "otlp": map[string]any{ + "grpc": "original:4317", + }, + }, + }, + ConfigManager: config.ManagerConfig{ + Active: "fleet", + }, + SecretsManager: config.ManagerSecrets{ + Active: "", + }, + }, + } + + agent, err := New(logger, cfg) + require.NoError(t, err) + + orbAgent := agent.(*orbAgent) + orbAgent.secretsManager = &mockSecretsManager{} + orbAgent.policyManager = &mockPolicyManager{repo: repo} + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + // Start will fail when trying to start backends, but we can check the config before that + err = orbAgent.Start(ctx, cancel) + // We expect an error because there are no actual backends configured + // But the important thing is that the config was modified + require.Error(t, err) + + // Verify the config was modified by checking backendsCommon which is set in startBackends + // The OTLP configuration happens before startBackends, so backendsCommon should have the updated value + assert.Equal(t, "localhost:4317", orbAgent.backendsCommon.Otlp.Grpc, "grpc URL should be overridden to localhost:4317") +} + +func TestStart_FleetConfig_CreatesOTLPSectionWhenMissing(t *testing.T) { + logger := slog.New(slog.NewTextHandler(os.Stderr, nil)) + repo, err := policies.NewMemRepo() + require.NoError(t, err) + + cfg := config.Config{ + OrbAgent: config.OrbAgent{ + Backends: map[string]any{ + "common": map[string]any{ + "other": "value", + }, + }, + ConfigManager: config.ManagerConfig{ + Active: "fleet", + }, + SecretsManager: config.ManagerSecrets{ + Active: "", + }, + }, + } + + agent, err := New(logger, cfg) + require.NoError(t, err) + + orbAgent := agent.(*orbAgent) + orbAgent.secretsManager = &mockSecretsManager{} + orbAgent.policyManager = &mockPolicyManager{repo: repo} + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + err = orbAgent.Start(ctx, cancel) + require.Error(t, err) // Expected to fail when starting backends + + // Verify the config was modified by checking backendsCommon which is set in startBackends + // The OTLP configuration happens before startBackends, so backendsCommon should have the updated value + assert.Equal(t, "localhost:4317", orbAgent.backendsCommon.Otlp.Grpc, "grpc URL should be set to localhost:4317") +} + +func TestStart_FleetConfig_CreatesCommonBackendWhenMissing(t *testing.T) { + logger := slog.New(slog.NewTextHandler(os.Stderr, nil)) + repo, err := policies.NewMemRepo() + require.NoError(t, err) + + cfg := config.Config{ + OrbAgent: config.OrbAgent{ + Backends: map[string]any{}, + ConfigManager: config.ManagerConfig{ + Active: "fleet", + }, + SecretsManager: config.ManagerSecrets{ + Active: "", + }, + }, + } + + agent, err := New(logger, cfg) + require.NoError(t, err) + + orbAgent := agent.(*orbAgent) + orbAgent.secretsManager = &mockSecretsManager{} + orbAgent.policyManager = &mockPolicyManager{repo: repo} + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + err = orbAgent.Start(ctx, cancel) + require.Error(t, err) // Expected to fail when starting backends + + // Verify the config was modified by checking backendsCommon which is set in startBackends + // The OTLP configuration happens before startBackends, so backendsCommon should have the updated value + assert.Equal(t, "localhost:4317", orbAgent.backendsCommon.Otlp.Grpc, "grpc URL should be set to localhost:4317") +} + +func TestStart_NonFleetConfig_DoesNotModifyConfig(t *testing.T) { + logger := slog.New(slog.NewTextHandler(os.Stderr, nil)) + repo, err := policies.NewMemRepo() + require.NoError(t, err) + + originalGrpcURL := "original:4317" + cfg := config.Config{ + OrbAgent: config.OrbAgent{ + Backends: map[string]any{ + "common": map[string]any{ + "otlp": map[string]any{ + "grpc": originalGrpcURL, + }, + }, + }, + ConfigManager: config.ManagerConfig{ + Active: "local", // Not fleet + }, + SecretsManager: config.ManagerSecrets{ + Active: "", + }, + }, + } + + agent, err := New(logger, cfg) + require.NoError(t, err) + + orbAgent := agent.(*orbAgent) + orbAgent.secretsManager = &mockSecretsManager{} + orbAgent.policyManager = &mockPolicyManager{repo: repo} + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + err = orbAgent.Start(ctx, cancel) + require.Error(t, err) // Expected to fail when starting backends + + // Verify the config was NOT modified by checking backendsCommon which is set in startBackends + // For non-fleet config, the original value should remain + assert.Equal(t, originalGrpcURL, orbAgent.backendsCommon.Otlp.Grpc, "grpc URL should remain unchanged for non-fleet config") +} From a9f7b53e7039049fc60a6c067ed177a4be03342f Mon Sep 17 00:00:00 2001 From: James Jeffries Date: Mon, 24 Nov 2025 14:58:07 +0000 Subject: [PATCH 3/6] adds telemetry topic --- agent/agent_test.go | 5 ++++ agent/configmgr/fleet.go | 8 ++++-- .../configmgr/fleet/connection_hooks_test.go | 2 +- agent/configmgr/fleet/jwt_claims_test.go | 4 +++ agent/configmgr/fleet/topics.go | 3 +++ agent/configmgr/fleet/topics_test.go | 12 +++++++++ agent/configmgr/fleet_test.go | 18 ++++++++++--- agent/otlpbridge/server.go | 25 +++++++++++++++---- 8 files changed, 65 insertions(+), 12 deletions(-) diff --git a/agent/agent_test.go b/agent/agent_test.go index 28ce66c..6b54dc9 100644 --- a/agent/agent_test.go +++ b/agent/agent_test.go @@ -53,15 +53,19 @@ func (m *mockPolicyManager) RemovePolicyDataset(_ string, _ string, _ backend.Ba func (m *mockPolicyManager) GetPolicyState() ([]policies.PolicyData, error) { return nil, nil } + func (m *mockPolicyManager) GetRepo() policies.PolicyRepo { return m.repo } + func (m *mockPolicyManager) ApplyBackendPolicies(_ backend.Backend) error { return nil } + func (m *mockPolicyManager) RemoveBackendPolicies(_ backend.Backend, _ bool) error { return nil } + func (m *mockPolicyManager) RemovePolicy(_ string, _ string, _ string) error { return nil } @@ -76,6 +80,7 @@ func (m *mockSecretsManager) RegisterUpdatePoliciesCallback(_ func(map[string]bo func (m *mockSecretsManager) SolvePolicySecrets(payload config.PolicyPayload) (config.PolicyPayload, error) { return payload, nil } + func (m *mockSecretsManager) SolveConfigSecrets(backends map[string]any, configManager config.ManagerConfig) (map[string]any, config.ManagerConfig, error) { return backends, configManager, nil } diff --git a/agent/configmgr/fleet.go b/agent/configmgr/fleet.go index 1a45de5..2aa7007 100644 --- a/agent/configmgr/fleet.go +++ b/agent/configmgr/fleet.go @@ -102,7 +102,8 @@ func (fleetManager *fleetConfigManager) Start(cfg config.Config, backends map[st "capabilities_topic", topics.Capabilities, "inbox_topic", topics.Inbox, "outbox_topic", topics.Outbox, - "otlp_topic", topics.Ingest) + "otlp_topic", topics.Ingest, + "telemetry_topic", topics.Telemetry) connectionDetails := fleet.ConnectionDetails{ MQTTURL: jwtClaims.MqttURL, @@ -177,7 +178,10 @@ func (fleetManager *fleetConfigManager) Start(cfg config.Config, backends map[st pub := otlpbridge.NewCMAdapterPublisher(cm) fleetManager.otlpBridge.SetPublisher(pub) fleetManager.otlpBridge.SetIngestTopic(topics.Ingest) - fleetManager.logger.Info("OTLP bridge bound to Fleet MQTT", slog.String("topic", topics.Ingest)) + fleetManager.otlpBridge.SetTelemetryTopic(topics.Telemetry) + fleetManager.logger.Info("OTLP bridge bound to Fleet MQTT", + slog.String("ingest_topic", topics.Ingest), + slog.String("telemetry_topic", topics.Telemetry)) }) // Start goroutine to handle reconnect requests (JWT refresh) diff --git a/agent/configmgr/fleet/connection_hooks_test.go b/agent/configmgr/fleet/connection_hooks_test.go index 5733e07..c42a74c 100644 --- a/agent/configmgr/fleet/connection_hooks_test.go +++ b/agent/configmgr/fleet/connection_hooks_test.go @@ -56,7 +56,7 @@ func TestConnect_StoresTopicsBeforeConnecting(t *testing.T) { MQTTURL: "mqtt://localhost:1883", Token: "", AgentID: "agent-1", - Topics: TokenResponseTopics{Inbox: "inbox/x", Heartbeat: "hb/x", Capabilities: "cap/x", Outbox: "out/x", Ingest: "otlp/x"}, + Topics: TokenResponseTopics{Inbox: "inbox/x", Heartbeat: "hb/x", Capabilities: "cap/x", Outbox: "out/x", Ingest: "otlp/x", Telemetry: "telemetry/x"}, ClientID: "client-1", Zone: "zone-a", } diff --git a/agent/configmgr/fleet/jwt_claims_test.go b/agent/configmgr/fleet/jwt_claims_test.go index 8025ded..653236e 100644 --- a/agent/configmgr/fleet/jwt_claims_test.go +++ b/agent/configmgr/fleet/jwt_claims_test.go @@ -173,6 +173,8 @@ func GestGenerateTopicsFromTemplate(t *testing.T) { Capabilities: "orgs/test-org/agents/test-client-123/capabilities", Inbox: "orgs/test-org/agents/test-client-123/inbox", Outbox: "orgs/test-org/agents/test-client-123/outbox", + Ingest: "orgs/test-org/agents/test-client-123/ingest", + Telemetry: "orgs/test-org/agents/test-client-123/telemetry", }, }, { @@ -184,6 +186,8 @@ func GestGenerateTopicsFromTemplate(t *testing.T) { Capabilities: "orgs/prod-company/agents/test-agent-123/capabilities", Inbox: "orgs/prod-company/agents/test-agent-123/inbox", Outbox: "orgs/prod-company/agents/test-agent-123/outbox", + Ingest: "orgs/prod-company/agents/test-agent-123/ingest", + Telemetry: "orgs/prod-company/agents/test-agent-123/telemetry", }, }, } diff --git a/agent/configmgr/fleet/topics.go b/agent/configmgr/fleet/topics.go index 2e9ec18..b830129 100644 --- a/agent/configmgr/fleet/topics.go +++ b/agent/configmgr/fleet/topics.go @@ -18,6 +18,7 @@ const ( inboxTemplate = "orgs/{org_id}/agents/{agent_id}/inbox" outboxTemplate = "orgs/{org_id}/agents/{agent_id}/outbox" ingestTemplate = "orgs/{org_id}/agents/{agent_id}/ingest" + telemetryTemplate = "orgs/{org_id}/agents/{agent_id}/telemetry" groupsTemplate = "orgs/{org_id}/groups/{group_id}" ) @@ -29,6 +30,7 @@ type TokenResponseTopics struct { Inbox string `json:"inbox"` Outbox string `json:"outbox"` Ingest string `json:"ingest"` + Telemetry string `json:"telemetry"` } // GenerateTopicsFromTemplate creates actual topic names from templates using JWT claims and config agent_id @@ -39,6 +41,7 @@ func GenerateTopicsFromTemplate(jwtClaims *JWTClaims) (*TokenResponseTopics, err Inbox: fillTopicTemplate(inboxTemplate, jwtClaims), Outbox: fillTopicTemplate(outboxTemplate, jwtClaims), Ingest: fillTopicTemplate(ingestTemplate, jwtClaims), + Telemetry: fillTopicTemplate(telemetryTemplate, jwtClaims), }, nil } diff --git a/agent/configmgr/fleet/topics_test.go b/agent/configmgr/fleet/topics_test.go index 875fb3f..d7d7bbd 100644 --- a/agent/configmgr/fleet/topics_test.go +++ b/agent/configmgr/fleet/topics_test.go @@ -13,3 +13,15 @@ func TestGenerateTopicsFromTemplate_IncludesIngest(t *testing.T) { t.Fatalf("expected ingest topic %q, got %q", expected, topics.Ingest) } } + +func TestGenerateTopicsFromTemplate_IncludesTelemetry(t *testing.T) { + claims := &JWTClaims{OrgID: "org-123", AgentID: "agent-abc"} + topics, err := GenerateTopicsFromTemplate(claims) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + expected := "orgs/org-123/agents/agent-abc/telemetry" + if topics.Telemetry != expected { + t.Fatalf("expected telemetry topic %q, got %q", expected, topics.Telemetry) + } +} diff --git a/agent/configmgr/fleet_test.go b/agent/configmgr/fleet_test.go index a90da68..586198e 100644 --- a/agent/configmgr/fleet_test.go +++ b/agent/configmgr/fleet_test.go @@ -721,7 +721,10 @@ func TestFleetConfigManager_OnReadyHook_InitializesBridgeOnFirstCall(t *testing. pub := otlpbridge.NewCMAdapterPublisher(cm) fleetManager.otlpBridge.SetPublisher(pub) fleetManager.otlpBridge.SetIngestTopic(topics.Ingest) - fleetManager.logger.Info("OTLP bridge bound to Fleet MQTT", slog.String("topic", topics.Ingest)) + fleetManager.otlpBridge.SetTelemetryTopic(topics.Telemetry) + fleetManager.logger.Info("OTLP bridge bound to Fleet MQTT", + slog.String("ingest_topic", topics.Ingest), + slog.String("telemetry_topic", topics.Telemetry)) } // Register the hook @@ -729,7 +732,8 @@ func TestFleetConfigManager_OnReadyHook_InitializesBridgeOnFirstCall(t *testing. // Simulate first connection ready event topics := fleet.TokenResponseTopics{ - Ingest: "test/otlp/topic", + Ingest: "test/otlp/topic", + Telemetry: "test/telemetry/topic", } // Call the hook manually (simulating first connection) @@ -738,6 +742,7 @@ func TestFleetConfigManager_OnReadyHook_InitializesBridgeOnFirstCall(t *testing. // Verify bridge was initialized require.NotNil(t, fleetManager.otlpBridge, "bridge should be initialized after first hook call") assert.Equal(t, "test/otlp/topic", fleetManager.otlpBridge.GetIngestTopic(), "bridge should have correct ingest topic") + assert.Equal(t, "test/telemetry/topic", fleetManager.otlpBridge.GetTelemetryTopic(), "bridge should have correct telemetry topic") // Cleanup if fleetManager.otlpBridge != nil { @@ -791,7 +796,10 @@ func TestFleetConfigManager_OnReadyHook_SkipsInitializationOnReconnect(t *testin pub := otlpbridge.NewCMAdapterPublisher(cm) fleetManager.otlpBridge.SetPublisher(pub) fleetManager.otlpBridge.SetIngestTopic(topics.Ingest) - fleetManager.logger.Info("OTLP bridge bound to Fleet MQTT", slog.String("topic", topics.Ingest)) + fleetManager.otlpBridge.SetTelemetryTopic(topics.Telemetry) + fleetManager.logger.Info("OTLP bridge bound to Fleet MQTT", + slog.String("ingest_topic", topics.Ingest), + slog.String("telemetry_topic", topics.Telemetry)) } // Register the hook @@ -799,7 +807,8 @@ func TestFleetConfigManager_OnReadyHook_SkipsInitializationOnReconnect(t *testin // Simulate reconnection ready event topics := fleet.TokenResponseTopics{ - Ingest: "test/otlp/topic/reconnect", + Ingest: "test/otlp/topic/reconnect", + Telemetry: "test/telemetry/topic/reconnect", } // Call the hook manually (simulating reconnection) @@ -808,6 +817,7 @@ func TestFleetConfigManager_OnReadyHook_SkipsInitializationOnReconnect(t *testin // Verify bridge was NOT recreated (same instance) assert.Equal(t, originalBridge, fleetManager.otlpBridge, "bridge should not be recreated on reconnect") assert.Equal(t, "test/otlp/topic/reconnect", fleetManager.otlpBridge.GetIngestTopic(), "bridge should have updated ingest topic") + assert.Equal(t, "test/telemetry/topic/reconnect", fleetManager.otlpBridge.GetTelemetryTopic(), "bridge should have updated telemetry topic") // Cleanup if fleetManager.otlpBridge != nil { diff --git a/agent/otlpbridge/server.go b/agent/otlpbridge/server.go index 95b5c39..b016150 100644 --- a/agent/otlpbridge/server.go +++ b/agent/otlpbridge/server.go @@ -25,11 +25,12 @@ type BridgeServer struct { closeOnce sync.Once // Shared runtime state - mu sync.RWMutex - publisher Publisher - ingestTopic string - policyRepo policies.PolicyRepo - logger *slog.Logger + mu sync.RWMutex + publisher Publisher + ingestTopic string + telemetryTopic string + policyRepo policies.PolicyRepo + logger *slog.Logger } // NewBridgeServer builds a BridgeServer but does not start it. @@ -80,6 +81,20 @@ func (s *BridgeServer) GetIngestTopic() string { return s.ingestTopic } +// SetTelemetryTopic sets the telemetry topic for publishing. +func (s *BridgeServer) SetTelemetryTopic(topic string) { + s.mu.Lock() + defer s.mu.Unlock() + s.telemetryTopic = topic +} + +// GetTelemetryTopic returns the current telemetry topic (for handlers). +func (s *BridgeServer) GetTelemetryTopic() string { + s.mu.RLock() + defer s.mu.RUnlock() + return s.telemetryTopic +} + // GetPolicyRepo returns the policy repo (for handlers). func (s *BridgeServer) GetPolicyRepo() policies.PolicyRepo { s.mu.RLock() From b89d7d5794325ab287b92949722095255e204c72 Mon Sep 17 00:00:00 2001 From: James Jeffries Date: Thu, 27 Nov 2025 13:14:23 +0000 Subject: [PATCH 4/6] ingest data to diode --- agent/agent.go | 7 +- agent/agent_test.go | 6 +- agent/configmgr/fleet.go | 2 +- agent/otlpbridge/handlers.go | 186 +++++++++++++++++++++++------------ 4 files changed, 132 insertions(+), 69 deletions(-) diff --git a/agent/agent.go b/agent/agent.go index 3343213..cd5dded 100644 --- a/agent/agent.go +++ b/agent/agent.go @@ -198,6 +198,7 @@ func (a *orbAgent) Start(ctx context.Context, cancelFunc context.CancelFunc) err } if a.config.OrbAgent.ConfigManager.Active == "fleet" { + const otlpBridgeEndpoint = "grpc://localhost:4317" if commonBackend, exists := a.config.OrbAgent.Backends["common"]; exists { if commonMap, ok := commonBackend.(map[string]any); ok { if otlpSection, ok := commonMap["otlp"].(map[string]any); ok { @@ -205,13 +206,13 @@ func (a *orbAgent) Start(ctx context.Context, cancelFunc context.CancelFunc) err if grpcURL != "" { a.logger.Warn("Overriding OTLP gRPC URL for fleet config manager", "url", grpcURL) } - otlpSection["grpc"] = "localhost:4317" + otlpSection["grpc"] = otlpBridgeEndpoint a.logger.Info("auto-configured OTLP gRPC URL for fleet config manager", "url", "localhost:4317") } else { // otlp section doesn't exist, create it commonMap["otlp"] = map[string]any{ - "grpc": "localhost:4317", + "grpc": otlpBridgeEndpoint, } a.logger.Info("auto-configured OTLP gRPC URL for fleet config manager", "url", "localhost:4317") } @@ -220,7 +221,7 @@ func (a *orbAgent) Start(ctx context.Context, cancelFunc context.CancelFunc) err // common backend doesn't exist, create it with otlp config a.config.OrbAgent.Backends["common"] = map[string]any{ "otlp": map[string]any{ - "grpc": "localhost:4317", + "grpc": otlpBridgeEndpoint, }, } a.logger.Info("auto-configured OTLP gRPC URL for fleet config manager", "url", "localhost:4317") diff --git a/agent/agent_test.go b/agent/agent_test.go index 6b54dc9..d1173af 100644 --- a/agent/agent_test.go +++ b/agent/agent_test.go @@ -126,7 +126,7 @@ func TestStart_FleetConfig_OverridesExistingOTLPGrpcURL(t *testing.T) { // Verify the config was modified by checking backendsCommon which is set in startBackends // The OTLP configuration happens before startBackends, so backendsCommon should have the updated value - assert.Equal(t, "localhost:4317", orbAgent.backendsCommon.Otlp.Grpc, "grpc URL should be overridden to localhost:4317") + assert.Equal(t, "grpc://localhost:4317", orbAgent.backendsCommon.Otlp.Grpc) } func TestStart_FleetConfig_CreatesOTLPSectionWhenMissing(t *testing.T) { @@ -165,7 +165,7 @@ func TestStart_FleetConfig_CreatesOTLPSectionWhenMissing(t *testing.T) { // Verify the config was modified by checking backendsCommon which is set in startBackends // The OTLP configuration happens before startBackends, so backendsCommon should have the updated value - assert.Equal(t, "localhost:4317", orbAgent.backendsCommon.Otlp.Grpc, "grpc URL should be set to localhost:4317") + assert.Equal(t, "grpc://localhost:4317", orbAgent.backendsCommon.Otlp.Grpc) } func TestStart_FleetConfig_CreatesCommonBackendWhenMissing(t *testing.T) { @@ -200,7 +200,7 @@ func TestStart_FleetConfig_CreatesCommonBackendWhenMissing(t *testing.T) { // Verify the config was modified by checking backendsCommon which is set in startBackends // The OTLP configuration happens before startBackends, so backendsCommon should have the updated value - assert.Equal(t, "localhost:4317", orbAgent.backendsCommon.Otlp.Grpc, "grpc URL should be set to localhost:4317") + assert.Equal(t, "grpc://localhost:4317", orbAgent.backendsCommon.Otlp.Grpc) } func TestStart_NonFleetConfig_DoesNotModifyConfig(t *testing.T) { diff --git a/agent/configmgr/fleet.go b/agent/configmgr/fleet.go index 2aa7007..842cdf3 100644 --- a/agent/configmgr/fleet.go +++ b/agent/configmgr/fleet.go @@ -158,7 +158,7 @@ func (fleetManager *fleetConfigManager) Start(cfg config.Config, backends map[st fleetManager.logger.Info("MQTT connection ready, initializing OTLP bridge") bridgeConfig := otlpbridge.BridgeConfig{ ListenAddr: ":4317", - Encoding: "protobuf", + Encoding: "json", } var err error fleetManager.otlpBridge, err = otlpbridge.NewBridgeServer(bridgeConfig, fleetManager.policyManager.GetRepo(), fleetManager.logger) diff --git a/agent/otlpbridge/handlers.go b/agent/otlpbridge/handlers.go index 9c7bdf5..32c5d03 100644 --- a/agent/otlpbridge/handlers.go +++ b/agent/otlpbridge/handlers.go @@ -9,31 +9,35 @@ import ( collectormetrics "go.opentelemetry.io/proto/otlp/collector/metrics/v1" collectortrace "go.opentelemetry.io/proto/otlp/collector/trace/v1" commonv1 "go.opentelemetry.io/proto/otlp/common/v1" + + "github.com/netboxlabs/orb-agent/agent/policies" ) +const diodePolicyNameAttributeKey = "diode.metadata.policy_name" + // Trace service handler type traceServer struct { bridge *BridgeServer collectortrace.UnimplementedTraceServiceServer } -func (s *traceServer) Export(ctx context.Context, req *collectortrace.ExportTraceServiceRequest) (*collectortrace.ExportTraceServiceResponse, error) { - pub := s.bridge.GetPublisher() - if pub == nil { - return nil, fmt.Errorf("publisher not yet initialized") - } - topic := s.bridge.GetIngestTopic() - if topic == "" { - return nil, fmt.Errorf("topic not yet initialized") - } - - payload, err := s.bridge.enc.Marshal(req) - if err != nil { - return nil, err - } - if err := pub.Publish(ctx, topic, payload); err != nil { - return nil, err - } +func (s *traceServer) Export(_ context.Context, _ *collectortrace.ExportTraceServiceRequest) (*collectortrace.ExportTraceServiceResponse, error) { + // pub := s.bridge.GetPublisher() + // if pub == nil { + // return nil, fmt.Errorf("publisher not yet initialized") + // } + // topic := s.bridge.GetIngestTopic() + // if topic == "" { + // return nil, fmt.Errorf("topic not yet initialized") + // } + + // payload, err := s.bridge.enc.Marshal(req) + // if err != nil { + // return nil, err + // } + // if err := pub.Publish(ctx, topic, payload); err != nil { + // return nil, err + // } return &collectortrace.ExportTraceServiceResponse{}, nil } @@ -43,23 +47,23 @@ type metricsServer struct { collectormetrics.UnimplementedMetricsServiceServer } -func (s *metricsServer) Export(ctx context.Context, req *collectormetrics.ExportMetricsServiceRequest) (*collectormetrics.ExportMetricsServiceResponse, error) { - pub := s.bridge.GetPublisher() - if pub == nil { - return nil, fmt.Errorf("publisher not yet initialized") - } - topic := s.bridge.GetIngestTopic() - if topic == "" { - return nil, fmt.Errorf("topic not yet initialized") - } - - payload, err := s.bridge.enc.Marshal(req) - if err != nil { - return nil, err - } - if err := pub.Publish(ctx, topic, payload); err != nil { - return nil, err - } +func (s *metricsServer) Export(_ context.Context, _ *collectormetrics.ExportMetricsServiceRequest) (*collectormetrics.ExportMetricsServiceResponse, error) { + // pub := s.bridge.GetPublisher() + // if pub == nil { + // return nil, fmt.Errorf("publisher not yet initialized") + // } + // topic := s.bridge.GetIngestTopic() + // if topic == "" { + // return nil, fmt.Errorf("topic not yet initialized") + // } + + // payload, err := s.bridge.enc.Marshal(req) + // if err != nil { + // return nil, err + // } + // if err := pub.Publish(ctx, topic, payload); err != nil { + // return nil, err + // } return &collectormetrics.ExportMetricsServiceResponse{}, nil } @@ -74,56 +78,114 @@ func (s *logsServer) Export(ctx context.Context, req *collectorlogs.ExportLogsSe if pub == nil { return nil, fmt.Errorf("publisher not yet initialized") } + if s.isIngestRequest(req) { + repo := s.bridge.GetPolicyRepo() + enrichLogsWithDatasets(req, repo) + s.bridge.logger.Info("ingesting enriched logs with dataset_ids", "request", req) + err := s.publishToIngestTopic(ctx, req, pub) + if err != nil { + return nil, err + } + } else { + err := s.publishToTelemetryTopic(ctx, req, pub) + if err != nil { + return nil, err + } + } + return &collectorlogs.ExportLogsServiceResponse{}, nil +} + +// isIngestRequest checks if the request contains a policy_name attribute in resource or scope attributes +func (s *logsServer) isIngestRequest(req *collectorlogs.ExportLogsServiceRequest) bool { + for _, rl := range req.ResourceLogs { + if rl == nil { + continue + } + // Check Resource attributes first + if rl.Resource != nil && rl.Resource.Attributes != nil { + for _, attr := range rl.Resource.Attributes { + if attr != nil && attr.Key == diodePolicyNameAttributeKey && attr.Value != nil { + return true + } + } + } + // Also check Scope attributes for backward compatibility + for _, sl := range rl.ScopeLogs { + if sl == nil || sl.Scope == nil || sl.Scope.Attributes == nil { + continue + } + for _, attr := range sl.Scope.Attributes { + if attr != nil && attr.Key == diodePolicyNameAttributeKey && attr.Value != nil { + return true + } + } + } + } + return false +} + +func (s *logsServer) publishToIngestTopic(ctx context.Context, req *collectorlogs.ExportLogsServiceRequest, pub Publisher) error { topic := s.bridge.GetIngestTopic() if topic == "" { - return nil, fmt.Errorf("topic not yet initialized") + return fmt.Errorf("ingest topic not yet initialized") } - repo := s.bridge.GetPolicyRepo() - // Enrich logs with dataset_ids based on policy_name - if repo != nil { - enrichLogsWithDatasets(req, repo) + return s.publish(ctx, req, pub, topic) +} + +func (s *logsServer) publishToTelemetryTopic(ctx context.Context, req *collectorlogs.ExportLogsServiceRequest, pub Publisher) error { + topic := s.bridge.GetTelemetryTopic() + if topic == "" { + return fmt.Errorf("telemetrytopic not yet initialized") } + return s.publish(ctx, req, pub, topic) +} + +func (s *logsServer) publish(ctx context.Context, req *collectorlogs.ExportLogsServiceRequest, pub Publisher, topic string) error { payload, err := s.bridge.enc.Marshal(req) if err != nil { - return nil, err + return err } if err := pub.Publish(ctx, topic, payload); err != nil { - return nil, err + return err } - return &collectorlogs.ExportLogsServiceResponse{}, nil + return nil } // enrichLogsWithDatasets adds dataset_ids to ScopeLogs attributes based on policy_name. -func enrichLogsWithDatasets(req *collectorlogs.ExportLogsServiceRequest, repo interface{}) { - // Type assertion to allow for easier future mocking if needed - policyRepo, ok := repo.(interface { - GetByName(policyName string) (interface { - GetDatasetIDs() []string - }, error) - }) - if !ok { - // If repo doesn't have the right interface, skip enrichment - return - } - +func enrichLogsWithDatasets(req *collectorlogs.ExportLogsServiceRequest, repo policies.PolicyRepo) { for _, rl := range req.ResourceLogs { if rl == nil { continue } + + // Find policy_name attribute in Resource attributes first + policyName := "" + if rl.Resource != nil && rl.Resource.Attributes != nil { + for _, attr := range rl.Resource.Attributes { + if attr != nil && attr.Key == diodePolicyNameAttributeKey && attr.Value != nil { + if sv := attr.Value.GetStringValue(); sv != "" { + policyName = sv + break + } + } + } + } + for _, sl := range rl.ScopeLogs { if sl == nil || sl.Scope == nil || sl.Scope.Attributes == nil { continue } - // Find policy_name attribute - policyName := "" - for _, attr := range sl.Scope.Attributes { - if attr != nil && attr.Key == "policy_name" && attr.Value != nil { - if sv := attr.Value.GetStringValue(); sv != "" { - policyName = sv - break + // If not found in Resource, check Scope attributes for backward compatibility + if policyName == "" { + for _, attr := range sl.Scope.Attributes { + if attr != nil && attr.Key == diodePolicyNameAttributeKey && attr.Value != nil { + if sv := attr.Value.GetStringValue(); sv != "" { + policyName = sv + break + } } } } @@ -133,7 +195,7 @@ func enrichLogsWithDatasets(req *collectorlogs.ExportLogsServiceRequest, repo in } // Lookup policy and get dataset IDs - policy, err := policyRepo.GetByName(policyName) + policy, err := repo.GetByName(policyName) if err != nil { // Policy not found; skip enrichment for this scope slog.Debug("policy not found", "name", policyName, "error", err) From e10fd3e1f4842184a309e330db6ca9011cc04573 Mon Sep 17 00:00:00 2001 From: James Jeffries Date: Thu, 27 Nov 2025 13:21:37 +0000 Subject: [PATCH 5/6] fixes tests --- agent/otlpbridge/handlers.go | 68 +++++++++++++++---------------- agent/otlpbridge/handlers_test.go | 2 +- 2 files changed, 35 insertions(+), 35 deletions(-) diff --git a/agent/otlpbridge/handlers.go b/agent/otlpbridge/handlers.go index 32c5d03..613e732 100644 --- a/agent/otlpbridge/handlers.go +++ b/agent/otlpbridge/handlers.go @@ -21,23 +21,23 @@ type traceServer struct { collectortrace.UnimplementedTraceServiceServer } -func (s *traceServer) Export(_ context.Context, _ *collectortrace.ExportTraceServiceRequest) (*collectortrace.ExportTraceServiceResponse, error) { - // pub := s.bridge.GetPublisher() - // if pub == nil { - // return nil, fmt.Errorf("publisher not yet initialized") - // } - // topic := s.bridge.GetIngestTopic() - // if topic == "" { - // return nil, fmt.Errorf("topic not yet initialized") - // } - - // payload, err := s.bridge.enc.Marshal(req) - // if err != nil { - // return nil, err - // } - // if err := pub.Publish(ctx, topic, payload); err != nil { - // return nil, err - // } +func (s *traceServer) Export(ctx context.Context, req *collectortrace.ExportTraceServiceRequest) (*collectortrace.ExportTraceServiceResponse, error) { + pub := s.bridge.GetPublisher() + if pub == nil { + return nil, fmt.Errorf("publisher not yet initialized") + } + topic := s.bridge.GetIngestTopic() + if topic == "" { + return nil, fmt.Errorf("topic not yet initialized") + } + + payload, err := s.bridge.enc.Marshal(req) + if err != nil { + return nil, err + } + if err := pub.Publish(ctx, topic, payload); err != nil { + return nil, err + } return &collectortrace.ExportTraceServiceResponse{}, nil } @@ -47,23 +47,23 @@ type metricsServer struct { collectormetrics.UnimplementedMetricsServiceServer } -func (s *metricsServer) Export(_ context.Context, _ *collectormetrics.ExportMetricsServiceRequest) (*collectormetrics.ExportMetricsServiceResponse, error) { - // pub := s.bridge.GetPublisher() - // if pub == nil { - // return nil, fmt.Errorf("publisher not yet initialized") - // } - // topic := s.bridge.GetIngestTopic() - // if topic == "" { - // return nil, fmt.Errorf("topic not yet initialized") - // } - - // payload, err := s.bridge.enc.Marshal(req) - // if err != nil { - // return nil, err - // } - // if err := pub.Publish(ctx, topic, payload); err != nil { - // return nil, err - // } +func (s *metricsServer) Export(ctx context.Context, req *collectormetrics.ExportMetricsServiceRequest) (*collectormetrics.ExportMetricsServiceResponse, error) { + pub := s.bridge.GetPublisher() + if pub == nil { + return nil, fmt.Errorf("publisher not yet initialized") + } + topic := s.bridge.GetIngestTopic() + if topic == "" { + return nil, fmt.Errorf("topic not yet initialized") + } + + payload, err := s.bridge.enc.Marshal(req) + if err != nil { + return nil, err + } + if err := pub.Publish(ctx, topic, payload); err != nil { + return nil, err + } return &collectormetrics.ExportMetricsServiceResponse{}, nil } diff --git a/agent/otlpbridge/handlers_test.go b/agent/otlpbridge/handlers_test.go index 406d00b..23a832b 100644 --- a/agent/otlpbridge/handlers_test.go +++ b/agent/otlpbridge/handlers_test.go @@ -64,7 +64,7 @@ func TestLogsHandler_Export_Publishes(t *testing.T) { enc: ProtobufEncoder{}, } bridge.SetPublisher(fp) - bridge.SetIngestTopic("logs") + bridge.SetTelemetryTopic("logs") s := &logsServer{bridge: bridge} _, err := s.Export(context.Background(), &collectorlogs.ExportLogsServiceRequest{}) if err != nil { From 8dc7047430a9a73a395563f126c6cd73963828e7 Mon Sep 17 00:00:00 2001 From: James Jeffries Date: Thu, 27 Nov 2025 16:00:21 +0000 Subject: [PATCH 6/6] improves logging --- agent/agent.go | 6 +++--- agent/otlpbridge/handlers.go | 2 +- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/agent/agent.go b/agent/agent.go index cd5dded..b6f65ae 100644 --- a/agent/agent.go +++ b/agent/agent.go @@ -207,14 +207,14 @@ func (a *orbAgent) Start(ctx context.Context, cancelFunc context.CancelFunc) err a.logger.Warn("Overriding OTLP gRPC URL for fleet config manager", "url", grpcURL) } otlpSection["grpc"] = otlpBridgeEndpoint - a.logger.Info("auto-configured OTLP gRPC URL for fleet config manager", "url", "localhost:4317") + a.logger.Info("auto-configured OTLP gRPC URL for fleet config manager", "url", otlpBridgeEndpoint) } else { // otlp section doesn't exist, create it commonMap["otlp"] = map[string]any{ "grpc": otlpBridgeEndpoint, } - a.logger.Info("auto-configured OTLP gRPC URL for fleet config manager", "url", "localhost:4317") + a.logger.Info("auto-configured OTLP gRPC URL for fleet config manager", "url", otlpBridgeEndpoint) } } } else { @@ -224,7 +224,7 @@ func (a *orbAgent) Start(ctx context.Context, cancelFunc context.CancelFunc) err "grpc": otlpBridgeEndpoint, }, } - a.logger.Info("auto-configured OTLP gRPC URL for fleet config manager", "url", "localhost:4317") + a.logger.Info("auto-configured OTLP gRPC URL for fleet config manager", "url", otlpBridgeEndpoint) } } diff --git a/agent/otlpbridge/handlers.go b/agent/otlpbridge/handlers.go index 613e732..2626d58 100644 --- a/agent/otlpbridge/handlers.go +++ b/agent/otlpbridge/handlers.go @@ -136,7 +136,7 @@ func (s *logsServer) publishToIngestTopic(ctx context.Context, req *collectorlog func (s *logsServer) publishToTelemetryTopic(ctx context.Context, req *collectorlogs.ExportLogsServiceRequest, pub Publisher) error { topic := s.bridge.GetTelemetryTopic() if topic == "" { - return fmt.Errorf("telemetrytopic not yet initialized") + return fmt.Errorf("telemetry topic not yet initialized") } return s.publish(ctx, req, pub, topic)