Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,9 @@ func New(logger *slog.Logger, c config.Config) (Agent, error) {
return nil, err
}

// Pass a background context to the config manager at construction time. The
// manager keeps its own copy and later derives child contexts from the
// runtime context supplied in Agent.Start.
cm := configmgr.New(logger, pm, c.OrbAgent.ConfigManager.Active)

return &orbAgent{
Expand Down
2 changes: 0 additions & 2 deletions agent/config/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,6 @@ type FleetManager struct {
SkipTLS bool `yaml:"skip_tls"`
ClientID string `yaml:"client_id"`
ClientSecret string `yaml:"client_secret"`
AgentID string `yaml:"agent_id"`
MQTTURL string `yaml:"mqtt_url,omitempty"`
}

// Sources represents the configuration for manager sources, including cloud, local and git.
Expand Down
145 changes: 106 additions & 39 deletions agent/configmgr/fleet.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,10 @@ import (
var _ Manager = (*fleetConfigManager)(nil)

type fleetConfigManager struct {
logger *slog.Logger
pMgr policymgr.PolicyManager
heartbeater *heartbeater
logger *slog.Logger
pMgr policymgr.PolicyManager
heartbeater *heartbeater
connectionManager *autopaho.ConnectionManager
}

const (
Expand Down Expand Up @@ -144,21 +145,18 @@ func (fleetManager *fleetConfigManager) Start(cfg config.Config, backends map[st
// use MQTT URL from token response or fallback to config
mqttURL := jwtClaims.MqttURL
if mqttURL == "" {
mqttURL = cfg.OrbAgent.ConfigManager.Sources.Fleet.MQTTURL
}
if mqttURL == "" {
return fmt.Errorf("no MQTT URL provided in token response or config")
return fmt.Errorf("no MQTT URL provided in token response")
}

// use the generated topics to connect over MQTT v5
err = fleetManager.connect(ctx, mqttURL, token.AccessToken, *topics, backends, cfg.OrbAgent.ConfigManager.Sources.Fleet.ClientID, jwtClaims.Zone)
err = fleetManager.connect(ctx, mqttURL, token.AccessToken, *topics, backends, cfg.OrbAgent.ConfigManager.Sources.Fleet.ClientID, jwtClaims.Zone, cfg.OrbAgent.Labels)
if err != nil {
return err
}
return nil
}

func (fleetManager *fleetConfigManager) connect(ctx context.Context, fleetMQTTURL, token string, topics tokenResponseTopics, backends map[string]backend.Backend, clientID, zone string) error {
func (fleetManager *fleetConfigManager) connect(ctx context.Context, fleetMQTTURL, token string, topics tokenResponseTopics, backends map[string]backend.Backend, clientID, zone string, labels map[string]string) error {
// Parse the ORB URL
serverURL, err := url.Parse(fleetMQTTURL)
if err != nil {
Expand All @@ -177,17 +175,16 @@ func (fleetManager *fleetConfigManager) connect(ctx context.Context, fleetMQTTUR
OnConnectionUp: func(cm *autopaho.ConnectionManager, _ *paho.Connack) {
fleetManager.logger.Info("MQTT connection established", "server", serverURL.String())

// //Subscribe to "mytopic" when connection is established
// _, err := cm.Subscribe(context.Background(), &paho.Subscribe{
// Subscriptions: []paho.SubscribeOptions{
// {Topic: topics.Inbox, QoS: 1},
// },
// })
// if err != nil {
// fleetManager.logger.Error("failed to subscribe", "topic", topics.Inbox, "error", err)
// } else {
// fleetManager.logger.Info("successfully subscribed", "topic", topics.Inbox)
// }
_, err := cm.Subscribe(context.Background(), &paho.Subscribe{
Subscriptions: []paho.SubscribeOptions{
{Topic: topics.Inbox, QoS: 1},
},
})
if err != nil {
fleetManager.logger.Error("failed to subscribe", "topic", topics.Inbox, "error", err)
} else {
fleetManager.logger.Info("successfully subscribed", "topic", topics.Inbox)
}

// start heartbeat loop bound to the same connection-level context
go fleetManager.heartbeater.sendHeartbeats(ctx, func() {}, func(ctx context.Context, payload []byte) error {
Expand All @@ -213,7 +210,7 @@ func (fleetManager *fleetConfigManager) connect(ctx context.Context, fleetMQTTUR
return nil
}, clientID)

go fleetManager.sendCapabilities(ctx, backends, func(ctx context.Context, payload []byte) error {
go fleetManager.sendCapabilities(ctx, backends, labels, func(ctx context.Context, payload []byte) error {
_, err := cm.Publish(ctx, &paho.Publish{
Topic: topics.Capabilities,
Payload: payload,
Expand All @@ -222,6 +219,28 @@ func (fleetManager *fleetConfigManager) connect(ctx context.Context, fleetMQTTUR
})
if err != nil {
// TODO: reconnect?
fleetManager.logger.Error("failed to publish capabilities", "error", err)
return err
}

fleetManager.logger.Debug("capabilities sent",
"topic", topics.Capabilities,
"payload", string(payload),
)
return nil
})

// TODO: this is a hack to work around the race condition of capabilities not being processed by the time we request group memberships
time.Sleep(10 * time.Second)
go fleetManager.sendGroupMembershipsRequest(ctx, func(ctx context.Context, payload []byte) error {
_, err := cm.Publish(ctx, &paho.Publish{
Topic: topics.Outbox,
Payload: payload,
QoS: 1,
Retain: false,
})
if err != nil {
fleetManager.logger.Error("failed to publish group memberships request", "error", err)
return err
}
return nil
Expand All @@ -234,14 +253,17 @@ func (fleetManager *fleetConfigManager) connect(ctx context.Context, fleetMQTTUR
ClientID: clientID,
OnPublishReceived: []func(paho.PublishReceived) (bool, error){
func(pr paho.PublishReceived) (bool, error) {
messageType := pr.Packet.Properties.User.Get(messageTypeUserPropertyKey)
// Log any published messages to subscribed topics
fleetManager.logger.Info("received MQTT message",
"topic", pr.Packet.Topic,
"payload", string(pr.Packet.Payload),
"message_type", messageType)
fleetManager.logger.Info("received MQTT message", "topic", pr.Packet.Topic)

orgID := strings.Split(pr.Packet.Topic, "/")[1]
var rpc messages.RPC
if err := json.Unmarshal(pr.Packet.Payload, &rpc); err != nil {
fleetManager.logger.Error("failed to unmarshal RPC", "error", err)
return true, nil
}

fleetManager.dispatchToHandlers(messageType, pr.Packet.Payload)
fleetManager.dispatchToHandlers(rpc.Func, rpc, orgID)

return true, nil
},
Expand All @@ -257,7 +279,7 @@ func (fleetManager *fleetConfigManager) connect(ctx context.Context, fleetMQTTUR
}

// Create and start the connection manager using the long-lived context.
connectionManager, err := autopaho.NewConnection(ctx, cfg)
fleetManager.connectionManager, err = autopaho.NewConnection(ctx, cfg)
if err != nil {
fleetManager.logger.Error("failed to create MQTT connection", "error", err)
return err
Expand All @@ -268,7 +290,7 @@ func (fleetManager *fleetConfigManager) connect(ctx context.Context, fleetMQTTUR
waitCtx, cancel := context.WithTimeout(ctx, 30*time.Second)
defer cancel()

err = connectionManager.AwaitConnection(waitCtx)
err = fleetManager.connectionManager.AwaitConnection(waitCtx)
if err != nil {
fleetManager.logger.Error("failed to establish initial MQTT connection", "error", err)
return err
Expand All @@ -278,10 +300,29 @@ func (fleetManager *fleetConfigManager) connect(ctx context.Context, fleetMQTTUR
return nil
}

func (fleetManager *fleetConfigManager) sendCapabilities(ctx context.Context, backends map[string]backend.Backend, publishFunc func(ctx context.Context, payload []byte) error) {
func (fleetManager *fleetConfigManager) sendGroupMembershipsRequest(ctx context.Context, publishFunc func(ctx context.Context, payload []byte) error) {
body, err := json.Marshal(messages.RPC{
// SchemaVersion: messages.CurrentRPCSchemaVersion, // TODO: add schema version check later
Func: "group_membership_req",
Payload: messages.SendGroupMembershipsRequest{},
})
if err != nil {
fleetManager.logger.Error("backend failed to marshal capabilities, skipping", "error", err)
return
}

fleetManager.logger.Info("sending group memberships request", "value", string(body))
err = publishFunc(ctx, body)
if err != nil {
fleetManager.logger.Error("error sending group memberships request", "error", err)
}
fleetManager.logger.Info("group memberships request sent", "value", string(body))
}

func (fleetManager *fleetConfigManager) sendCapabilities(ctx context.Context, backends map[string]backend.Backend, labels map[string]string, publishFunc func(ctx context.Context, payload []byte) error) {
capabilities := messages.Capabilities{
SchemaVersion: messages.CurrentCapabilitiesSchemaVersion,
// AgentTags: fleetManager.config.OrbAgent.Tags, // TODO: add tags
AgentLabels: labels,
OrbAgent: messages.OrbAgentInfo{
Version: version.GetBuildVersion(),
},
Expand Down Expand Up @@ -318,14 +359,40 @@ func (fleetManager *fleetConfigManager) sendCapabilities(ctx context.Context, ba
}
}

func (fleetManager *fleetConfigManager) dispatchToHandlers(_ string, _ []byte) {
// TODO: dispatch to handlers
// switch messageType {
// case "config":
// fleetManager.handleConfig(payload)
// case "policy":
// fleetManager.handlePolicy(payload)
// }
func (fleetManager *fleetConfigManager) dispatchToHandlers(messageType string, rpc messages.RPC, orgID string) {
switch messageType {
case "group_membership":
fleetManager.handleGroupMemberships(rpc, orgID)
default:
fleetManager.logger.Debug("unknown message type", "message_type", messageType)
}
}

func (fleetManager *fleetConfigManager) handleGroupMemberships(rpc messages.RPC, orgID string) {
fleetManager.logger.Debug("handling group memberships", "payload", rpc.Payload)
payloadJSON, err := json.Marshal(rpc.Payload)
if err != nil {
fleetManager.logger.Error("failed to marshal payload", "error", err)
return
}
groupMeberships := messages.GroupMemberships{}
if err := json.Unmarshal(payloadJSON, &groupMeberships); err != nil {
fleetManager.logger.Error("failed to unmarshal payload", "error", err)
return
}

for _, group := range groupMeberships.Groups {
fleetManager.logger.Info("subscribing to group", "group", group)
_, err := fleetManager.connectionManager.Subscribe(context.Background(), &paho.Subscribe{
Subscriptions: []paho.SubscribeOptions{
{Topic: groupTopic(orgID, group.GroupID), QoS: 1},
},
})
if err != nil {
fleetManager.logger.Error("failed to subscribe to group", "error", err)
}
fleetManager.logger.Info("subscribed to group topic for group ID", "group_id", group.GroupID)
}
}

type tokenResponseTopics struct {
Expand Down
Loading