Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 28 additions & 29 deletions agent/configmgr/fleet.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,11 @@ type heartbeater struct {
heartbeatCtx context.Context
}

func (hb *heartbeater) sendSingleHeartbeat(ctx context.Context, publishFunc func(ctx context.Context, payload []byte) error, agentID string, _ time.Time, _ messages.HeartbeatState) {
func (hb *heartbeater) sendSingleHeartbeat(ctx context.Context, publishFunc func(ctx context.Context, payload []byte) error, _ string, _ time.Time, _ messages.HeartbeatState) {
hbData := messages.Heartbeat{
AgentID: agentID,
Version: "1.0.0",
SchemaVersion: messages.CurrentHeartbeatSchemaVersion,
TimeStamp: time.Now().UTC(),
State: 1,
}

body, err := json.Marshal(hbData)
Expand All @@ -71,7 +72,7 @@ func (hb *heartbeater) sendHeartbeats(ctx context.Context, _ context.CancelFunc,
// (if any) remain accurate.
hb.heartbeatCtx = ctx

hb.logger.Debug("start heartbeats routine", slog.Any("routine", ctx.Value("routine")))
hb.logger.Debug("start heartbeats routine")
hb.sendSingleHeartbeat(ctx, publishFunc, agentID, time.Now(), messages.Online)

for {
Expand Down Expand Up @@ -103,17 +104,22 @@ func newFleetConfigManager(ctx context.Context, logger *slog.Logger, pMgr policy
}

func (fleetManager *fleetConfigManager) Start(cfg config.Config, backends map[string]backend.Backend) error {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
ctx := context.Background()

fleetManager.logger.Info("starting fleet config manager", "token_url", cfg.OrbAgent.ConfigManager.Sources.Fleet.TokenURL, "client_id", cfg.OrbAgent.ConfigManager.Sources.Fleet.ClientID, "client_secret", cfg.OrbAgent.ConfigManager.Sources.Fleet.ClientSecret)
// call the token url to get the token
token, err := fleetManager.getToken(ctx, cfg.OrbAgent.ConfigManager.Sources.Fleet.TokenURL, cfg.OrbAgent.ConfigManager.Sources.Fleet.ClientID, cfg.OrbAgent.ConfigManager.Sources.Fleet.ClientSecret)
if err != nil {
return err
}

jwtClaims, err := parseJWTClaims(token.AccessToken)
if err != nil {
return fmt.Errorf("failed to parse JWT claims: %w", err)
}

// generate topics from JWT claims and config agent_id using hardcoded templates
topics, err := generateTopicsFromTemplate(token.AccessToken, cfg.OrbAgent.ConfigManager.Sources.Fleet.AgentID)
topics, err := generateTopicsFromTemplate(jwtClaims)
if err != nil {
return fmt.Errorf("failed to generate topics: %w", err)
}
Expand All @@ -125,30 +131,30 @@ func (fleetManager *fleetConfigManager) Start(cfg config.Config, backends map[st
"outbox_topic", topics.Outbox)

// use MQTT URL from token response or fallback to config
mqttURL := token.MQTTURL
mqttURL := jwtClaims.MqttURL
if mqttURL == "" {
mqttURL = cfg.OrbAgent.ConfigManager.Sources.Fleet.MQTTURL
}
if mqttURL == "" {
return fmt.Errorf("no MQTT URL provided in token response or config")
}

// use the generated topics to connect over MQTT v5
err = fleetManager.connect(mqttURL, token.AccessToken, *topics, backends, cfg.OrbAgent.ConfigManager.Sources.Fleet.AgentID)
err = fleetManager.connect(ctx, mqttURL, token.AccessToken, *topics, backends, cfg.OrbAgent.ConfigManager.Sources.Fleet.ClientID, jwtClaims.Zone)
if err != nil {
return err
}
return nil
}

func (fleetManager *fleetConfigManager) connectWithContext(ctx context.Context, fleetMQTTURL, token string, topics tokenResponseTopics, backends map[string]backend.Backend, agentID string) error {
func (fleetManager *fleetConfigManager) connect(ctx context.Context, fleetMQTTURL, token string, topics tokenResponseTopics, backends map[string]backend.Backend, clientID, zone string) error {
// Parse the ORB URL
serverURL, err := url.Parse(fleetMQTTURL)
if err != nil {
fleetManager.logger.Error("failed to parse ORB URL", "url", fleetMQTTURL, "error", err)
return err
}

// Configure autopaho client
clientID := "orb-agent-" + time.Now().Format("20060102150405")

cfg := autopaho.ClientConfig{
ServerUrls: []*url.URL{serverURL},
KeepAlive: 30,
Expand All @@ -160,7 +166,7 @@ func (fleetManager *fleetConfigManager) connectWithContext(ctx context.Context,
OnConnectionUp: func(cm *autopaho.ConnectionManager, _ *paho.Connack) {
fleetManager.logger.Info("MQTT connection established", "server", serverURL.String())

// Subscribe to "mytopic" when connection is established
// //Subscribe to "mytopic" when connection is established
// _, err := cm.Subscribe(context.Background(), &paho.Subscribe{
// Subscriptions: []paho.SubscribeOptions{
// {Topic: topics.Inbox, QoS: 1},
Expand All @@ -173,11 +179,11 @@ func (fleetManager *fleetConfigManager) connectWithContext(ctx context.Context,
// }

// start heartbeat loop bound to the same connection-level context
fleetManager.heartbeater.sendHeartbeats(ctx, func() {}, func(ctx context.Context, payload []byte) error {
go fleetManager.heartbeater.sendHeartbeats(ctx, func() {}, func(ctx context.Context, payload []byte) error {
publishResponse, err := cm.Publish(ctx, &paho.Publish{
Topic: topics.Heartbeat,
Payload: payload,
QoS: 1,
QoS: 0,
Retain: false,
})
if err != nil {
Expand All @@ -194,7 +200,7 @@ func (fleetManager *fleetConfigManager) connectWithContext(ctx context.Context,
"payload", string(payload),
)
return nil
}, agentID)
}, clientID)

go fleetManager.sendCapabilities(ctx, backends, func(ctx context.Context, payload []byte) error {
_, err := cm.Publish(ctx, &paho.Publish{
Expand Down Expand Up @@ -234,12 +240,9 @@ func (fleetManager *fleetConfigManager) connectWithContext(ctx context.Context,

// Set authentication if token is provided
if token != "" {
cfg.ConnectUsername = "token" // Using token as username, adjust as needed for your auth scheme
fleetManager.logger.Info("setting MQTT authentication", "client_id", clientID, "zone", zone)
cfg.ConnectUsername = fmt.Sprintf("%s:%s", zone, clientID)
cfg.ConnectPassword = []byte(token)
} else {
// TODO: remove these temporary credentials
cfg.ConnectUsername = "admin"
cfg.ConnectPassword = []byte("admin")
}

// Create and start the connection manager using the long-lived context.
Expand All @@ -264,12 +267,6 @@ func (fleetManager *fleetConfigManager) connectWithContext(ctx context.Context,
return nil
}

// connect is a backward-compatibility shim that invokes connectWithContext with
// the fleet manager's root context.
func (fleetManager *fleetConfigManager) connect(fleetMQTTURL, token string, topics tokenResponseTopics, backends map[string]backend.Backend, agentID string) error {
return fleetManager.connectWithContext(fleetManager.heartbeater.heartbeatCtx, fleetMQTTURL, token, topics, backends, agentID)
}

func (fleetManager *fleetConfigManager) sendCapabilities(ctx context.Context, backends map[string]backend.Backend, publishFunc func(ctx context.Context, payload []byte) error) {
capabilities := messages.Capabilities{
SchemaVersion: messages.CurrentCapabilitiesSchemaVersion,
Expand Down Expand Up @@ -348,14 +345,16 @@ func (fleetManager *fleetConfigManager) getToken(ctx context.Context, tokenURL s
fleetManager.logger.Debug("requesting access token", "token_url", tokenURL, "client_id", clientID)

scopes := []string{
"orb.mqtt",
"orb.mqtt:agent",
"orb.mqtt:group",
}

data := url.Values{}
data.Set("grant_type", "client_credentials")
data.Set("scope", strings.Join(scopes, " "))
data.Set("client_id", clientID)
data.Set("client_secret", clientSecret)
data.Set("audience", "orb")

fleetManager.logger.Debug("sending token request", "url", tokenURL, "data", data, "client_id", clientID) //, "client_secret", clientSecret)

Expand Down
Loading