Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 6 additions & 9 deletions agent/config/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,15 +35,12 @@ type GitManager struct {

// FleetManager represents the Orb ConfigManager configuration.
type FleetManager struct {
URL string `yaml:"url"`
TokenURL string `yaml:"token_url"`
ClientID string `yaml:"client_id"`
ClientSecret string `yaml:"client_secret"`
MQTTURL string `yaml:"mqtt_url,omitempty"`
HeartbeatTopic string `yaml:"heartbeat_topic,omitempty"`
CapabilitiesTopic string `yaml:"capabilities_topic,omitempty"`
// TopicName is kept for backward compatibility
TopicName string `yaml:"topic_name,omitempty"`
URL string `yaml:"url"`
TokenURL string `yaml:"token_url"`
ClientID string `yaml:"client_id"`
ClientSecret string `yaml:"client_secret"`
AgentID string `yaml:"agent_id"`
MQTTURL string `yaml:"mqtt_url,omitempty"`
}

// Sources represents the configuration for manager sources, including cloud, local and git.
Expand Down
96 changes: 30 additions & 66 deletions agent/configmgr/fleet.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,9 @@ type heartbeater struct {
heartbeatCtx context.Context
}

func (hb *heartbeater) sendSingleHeartbeat(ctx context.Context, publishFunc func(ctx context.Context, payload []byte) error, _ time.Time, _ messages.HeartbeatState) {
func (hb *heartbeater) sendSingleHeartbeat(ctx context.Context, publishFunc func(ctx context.Context, payload []byte) error, agentID string, _ time.Time, _ messages.HeartbeatState) {
hbData := messages.Heartbeat{
AgentID: "orb-agent",
AgentID: agentID,
Version: "1.0.0",
}

Expand All @@ -66,23 +66,23 @@ func (hb *heartbeater) sendSingleHeartbeat(ctx context.Context, publishFunc func
// supplied context is cancelled. The cancelFunc parameter is ignored by the
// implementation but is accepted for backward-compatibility with unit tests
// that expect to pass it.
func (hb *heartbeater) sendHeartbeats(ctx context.Context, _ context.CancelFunc, publishFunc func(ctx context.Context, payload []byte) error) {
func (hb *heartbeater) sendHeartbeats(ctx context.Context, _ context.CancelFunc, publishFunc func(ctx context.Context, payload []byte) error, agentID string) {
// Update our internal reference so other methods that read hb.heartbeatCtx
// (if any) remain accurate.
hb.heartbeatCtx = ctx

hb.logger.Debug("start heartbeats routine", slog.Any("routine", ctx.Value("routine")))
hb.sendSingleHeartbeat(ctx, publishFunc, time.Now(), messages.Online)
hb.sendSingleHeartbeat(ctx, publishFunc, agentID, time.Now(), messages.Online)

for {
select {
case <-ctx.Done():
hb.logger.Debug("context done, stopping heartbeats routine")
hb.sendSingleHeartbeat(ctx, publishFunc, time.Now(), messages.Offline)
hb.sendSingleHeartbeat(ctx, publishFunc, agentID, time.Now(), messages.Offline)
hb.heartbeatCtx = nil
return
case t := <-hb.hbTicker.C:
hb.sendSingleHeartbeat(ctx, publishFunc, t, messages.Online)
hb.sendSingleHeartbeat(ctx, publishFunc, agentID, t, messages.Online)
}
}
}
Expand Down Expand Up @@ -112,68 +112,33 @@ func (fleetManager *fleetConfigManager) Start(cfg config.Config, backends map[st
return err
}

// merge configuration values with token response values (config takes priority)
mqttURL, topics := fleetManager.mergeConfigWithTokenResponse(cfg.OrbAgent.ConfigManager.Sources.Fleet, token)

// use the merged configuration to connect over MQTT v5
err = fleetManager.connect(mqttURL, token.AccessToken, topics, backends)
// generate topics from JWT claims and config agent_id using hardcoded templates
topics, err := generateTopicsFromTemplate(token.AccessToken, cfg.OrbAgent.ConfigManager.Sources.Fleet.AgentID)
if err != nil {
return err
}
return nil
}

// mergeConfigWithTokenResponse merges configuration values with token response values,
// giving priority to token response values when they are provided
func (fleetManager *fleetConfigManager) mergeConfigWithTokenResponse(fleetCfg config.FleetManager, token *tokenResponse) (string, tokenResponseTopics) {
// Start with configuration values as defaults
mqttURL := fleetCfg.MQTTURL
topics := tokenResponseTopics{
Heartbeat: fleetCfg.HeartbeatTopic,
Capabilities: fleetCfg.CapabilitiesTopic,
}

// Handle legacy TopicName field for backward compatibility - only if specific topics aren't set
if fleetCfg.TopicName != "" && fleetCfg.HeartbeatTopic == "" && fleetCfg.CapabilitiesTopic == "" {
fleetManager.logger.Debug("using legacy topic name as base for heartbeat and capabilities", "topic", fleetCfg.TopicName)
topics.Heartbeat = fleetCfg.TopicName + "/heartbeat"
topics.Capabilities = fleetCfg.TopicName + "/capabilities"
}

// Override with token response values if provided (token takes priority)
if token.MQTTURL != "" {
fleetManager.logger.Debug("using MQTT URL from token response", "token_url", token.MQTTURL, "config_url", fleetCfg.MQTTURL)
mqttURL = token.MQTTURL
} else if mqttURL != "" {
fleetManager.logger.Debug("using MQTT URL from configuration", "config_url", mqttURL)
}

// Token response topics override configuration topics
if token.Topics.Heartbeat != "" {
fleetManager.logger.Debug("using heartbeat topic from token response", "token_topic", token.Topics.Heartbeat, "config_topic", topics.Heartbeat)
topics.Heartbeat = token.Topics.Heartbeat
}

if token.Topics.Capabilities != "" {
fleetManager.logger.Debug("using capabilities topic from token response", "token_topic", token.Topics.Capabilities, "config_topic", topics.Capabilities)
topics.Capabilities = token.Topics.Capabilities
return fmt.Errorf("failed to generate topics: %w", err)
}

// Token response always provides inbox/outbox topics
topics.Inbox = token.Topics.Inbox
topics.Outbox = token.Topics.Outbox

fleetManager.logger.Info("merged configuration and token response",
"mqtt_url", mqttURL,
fleetManager.logger.Info("generated topics from JWT org_id and config agent_id",
"heartbeat_topic", topics.Heartbeat,
"capabilities_topic", topics.Capabilities,
"inbox_topic", topics.Inbox,
"outbox_topic", topics.Outbox)

return mqttURL, topics
// use MQTT URL from token response or fallback to config
mqttURL := token.MQTTURL
if mqttURL == "" {
mqttURL = cfg.OrbAgent.ConfigManager.Sources.Fleet.MQTTURL
}

// use the generated topics to connect over MQTT v5
err = fleetManager.connect(mqttURL, token.AccessToken, *topics, backends, cfg.OrbAgent.ConfigManager.Sources.Fleet.AgentID)
if err != nil {
return err
}
return nil
}

func (fleetManager *fleetConfigManager) connectWithContext(ctx context.Context, fleetMQTTURL, token string, topics tokenResponseTopics, backends map[string]backend.Backend) error {
func (fleetManager *fleetConfigManager) connectWithContext(ctx context.Context, fleetMQTTURL, token string, topics tokenResponseTopics, backends map[string]backend.Backend, agentID string) error {
// Parse the ORB URL
serverURL, err := url.Parse(fleetMQTTURL)
if err != nil {
Expand Down Expand Up @@ -229,7 +194,7 @@ func (fleetManager *fleetConfigManager) connectWithContext(ctx context.Context,
"payload", string(payload),
)
return nil
})
}, agentID)

go fleetManager.sendCapabilities(ctx, backends, func(ctx context.Context, payload []byte) error {
_, err := cm.Publish(ctx, &paho.Publish{
Expand Down Expand Up @@ -300,9 +265,9 @@ func (fleetManager *fleetConfigManager) connectWithContext(ctx context.Context,
}

// connect is a backward-compatibility shim that invokes connectWithContext with
// the fleet managers root context.
func (fleetManager *fleetConfigManager) connect(fleetMQTTURL, token string, topics tokenResponseTopics, backends map[string]backend.Backend) error {
return fleetManager.connectWithContext(fleetManager.heartbeater.heartbeatCtx, fleetMQTTURL, token, topics, backends)
// the fleet manager's root context.
func (fleetManager *fleetConfigManager) connect(fleetMQTTURL, token string, topics tokenResponseTopics, backends map[string]backend.Backend, agentID string) error {
return fleetManager.connectWithContext(fleetManager.heartbeater.heartbeatCtx, fleetMQTTURL, token, topics, backends, agentID)
}

func (fleetManager *fleetConfigManager) sendCapabilities(ctx context.Context, backends map[string]backend.Backend, publishFunc func(ctx context.Context, payload []byte) error) {
Expand Down Expand Up @@ -363,10 +328,9 @@ type tokenResponseTopics struct {
}

type tokenResponse struct {
AccessToken string `json:"access_token"`
MQTTURL string `json:"mqtt_url"`
Topics tokenResponseTopics `json:"topics"`
ExpiresIn int `json:"expires_in"`
AccessToken string `json:"access_token"`
MQTTURL string `json:"mqtt_url"`
ExpiresIn int `json:"expires_in"`
}

func (fleetManager *fleetConfigManager) getToken(ctx context.Context, tokenURL string, clientID string, clientSecret string) (*tokenResponse, error) {
Expand Down
Loading