Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions agent/configmgr/fleet/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,12 @@ type MQTTConnection struct {

// NewMQTTConnection creates a new MQTTConnection
func NewMQTTConnection(logger *slog.Logger, pMgr policymgr.PolicyManager, resetChan chan struct{}, backendState backend.StateRetriever) *MQTTConnection {
groupManager := newGroupManager()
return &MQTTConnection{
connectionManager: nil,
logger: logger,
heartbeater: newHeartbeater(logger, backendState, pMgr),
messaging: NewMessaging(logger, pMgr, resetChan),
heartbeater: newHeartbeater(logger, backendState, pMgr, &groupManager),
messaging: NewMessaging(logger, pMgr, resetChan, &groupManager),
resetChan: resetChan,
}
}
Expand Down Expand Up @@ -96,6 +97,8 @@ func (connection *MQTTConnection) Connect(ctx context.Context, fleetMQTTURL, tok
return nil
})

// Wait for capabilities to be handled
time.Sleep(10 * time.Second)
go connection.messaging.sendGroupMembershipsRequest(ctx, func(ctx context.Context, payload []byte) error {
_, err := cm.Publish(ctx, &paho.Publish{
Topic: topics.Outbox,
Expand Down
26 changes: 16 additions & 10 deletions agent/configmgr/fleet/from_rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,16 @@ import (
type Messaging struct {
logger *slog.Logger
policyManager policymgr.PolicyManager
groupManager *GroupManager
resetChan chan struct{}
}

// NewMessaging creates a new Messaging
func NewMessaging(logger *slog.Logger, policyManager policymgr.PolicyManager, resetChan chan struct{}) *Messaging {
func NewMessaging(logger *slog.Logger, policyManager policymgr.PolicyManager, resetChan chan struct{}, groupManager *GroupManager) *Messaging {
return &Messaging{
logger: logger,
policyManager: policyManager,
groupManager: groupManager,
resetChan: resetChan,
}
}
Expand All @@ -50,7 +52,7 @@ func (messaging *Messaging) DispatchToHandlers(ctx context.Context, payload []by
messaging.logger.Error("failed to unmarshal payload", "error", err)
return err
}
messaging.handleGroupMemberships(ctx, groupMemberships.Payload, orgID, agentID, topicActions.Subscribe, topicActions.Publish)
messaging.handleGroupMemberships(ctx, groupMemberships.Payload, orgID, agentID, topicActions)
case messages.AgentPolicyRPCFunc:
agentPolicies := messages.AgentPolicyRPC{}
if err := json.Unmarshal(payload, &agentPolicies); err != nil {
Expand Down Expand Up @@ -93,25 +95,29 @@ func (messaging *Messaging) DispatchToHandlers(ctx context.Context, payload []by
return nil
}

func (messaging *Messaging) handleGroupMemberships(ctx context.Context, groupMemberships messages.GroupMembershipRPCPayload, orgID string, agentID string, subscribeFunc func(topic string) error, publishFunc func(ctx context.Context, topic string, payload []byte) error) {
func (messaging *Messaging) handleGroupMemberships(ctx context.Context, groupMemberships messages.GroupMembershipRPCPayload, orgID string, agentID string, topicActions TopicActions) {
messaging.logger.Debug("handling group memberships", "payload", groupMemberships)

// if groupMemberships.FullList {
// // TODO: handle when this is the full list. We'll need to
// // - unsubscribe from all group topics not included in this request
// // - subscribe to all group topics
// }
if groupMemberships.FullList {
for _, group := range messaging.groupManager.GetAll() {
if err := topicActions.Unsubscribe(groupTopic(orgID, group.GroupID)); err != nil {
messaging.logger.Error("failed to unsubscribe from group topic", "group_id", group.GroupID, "error", err)
}
messaging.groupManager.Remove(group.GroupID)
}
}
for _, group := range groupMemberships.Groups {
messaging.groupManager.Add(group)
messaging.logger.Info("subscribing to group", "group", group)
topic := groupTopic(orgID, group.GroupID)
err := subscribeFunc(topic)
err := topicActions.Subscribe(topic)
if err != nil {
messaging.logger.Error("failed to subscribe to group", "error", err)
} else {
messaging.logger.Info("subscribed to group topic for group ID", "group_id", group.GroupID)
}
}
err := messaging.sendAgentPoliciesRequest(ctx, orgID, agentID, publishFunc)
err := messaging.sendAgentPoliciesRequest(ctx, orgID, agentID, topicActions.Publish)
if err != nil {
messaging.logger.Error("failed to send agent policies request", "error", err)
}
Expand Down
Loading
Loading