diff --git a/agent/configmgr/fleet.go b/agent/configmgr/fleet.go index f829435..a59ab5b 100644 --- a/agent/configmgr/fleet.go +++ b/agent/configmgr/fleet.go @@ -20,22 +20,30 @@ import ( var _ Manager = (*fleetConfigManager)(nil) type fleetConfigManager struct { - logger *slog.Logger - connection *fleet.MQTTConnection - authTokenManager *fleet.AuthTokenManager - resetChan chan struct{} - backendState backend.StateRetriever - policyManager policymgr.PolicyManager - otlpBridge *otlpbridge.BridgeServer + logger *slog.Logger + connection *fleet.MQTTConnection + authTokenManager *fleet.AuthTokenManager + resetChan chan struct{} + reconnectChan chan struct{} + backendState backend.StateRetriever + policyManager policymgr.PolicyManager + otlpBridge *otlpbridge.BridgeServer + config config.Config + backends map[string]backend.Backend + labels map[string]string + configYaml string + connectionDetails fleet.ConnectionDetails } func newFleetConfigManager(logger *slog.Logger, pMgr policymgr.PolicyManager, backendState backend.StateRetriever) *fleetConfigManager { resetChan := make(chan struct{}, 1) + reconnectChan := make(chan struct{}, 1) return &fleetConfigManager{ logger: logger, - connection: fleet.NewMQTTConnection(logger, pMgr, resetChan, backendState), + connection: fleet.NewMQTTConnection(logger, pMgr, resetChan, reconnectChan, backendState), authTokenManager: fleet.NewAuthTokenManager(logger), resetChan: resetChan, + reconnectChan: reconnectChan, backendState: backendState, policyManager: pMgr, } @@ -106,6 +114,14 @@ func (fleetManager *fleetConfigManager) Start(cfg config.Config, backends map[st if err != nil { return fmt.Errorf("failed to convert config to safe string: %w", err) } + + // Store connection state for reconnection + fleetManager.config = cfg + fleetManager.backends = backends + fleetManager.labels = cfg.OrbAgent.Labels + fleetManager.configYaml = string(configYaml) + fleetManager.connectionDetails = connectionDetails + err = fleetManager.connection.Connect(ctx, connectionDetails, backends, cfg.OrbAgent.Labels, string(configYaml)) if err != nil { return err @@ -158,6 +174,65 @@ func (fleetManager *fleetConfigManager) Start(cfg config.Config, backends map[st fleetManager.logger.Info("OTLP bridge bound to Fleet MQTT", slog.String("topic", topics.Ingest)) }) + // Start goroutine to handle reconnect requests (JWT refresh) + go func() { + for range fleetManager.reconnectChan { + fleetManager.logger.Info("JWT refresh and reconnection requested") + if err := fleetManager.refreshAndReconnect(ctx, timeout); err != nil { + fleetManager.logger.Error("failed to refresh and reconnect", "error", err) + } + } + }() + + return nil +} + +// refreshAndReconnect refreshes the JWT token and reconnects to MQTT +func (fleetManager *fleetConfigManager) refreshAndReconnect(ctx context.Context, timeout time.Duration) error { + // Refresh JWT token + token, err := fleetManager.authTokenManager.RefreshToken(ctx) + if err != nil { + return fmt.Errorf("failed to refresh token: %w", err) + } + + // Parse new JWT claims + jwtClaims, err := fleet.ParseJWTClaims(token.AccessToken) + if err != nil { + return fmt.Errorf("failed to parse JWT claims: %w", err) + } + + // Regenerate topics + topics, err := fleet.GenerateTopicsFromTemplate(jwtClaims) + if err != nil { + return fmt.Errorf("failed to generate topics: %w", err) + } + + fleetManager.logger.Info("refreshed JWT and generated new topics", + "heartbeat_topic", topics.Heartbeat, + "capabilities_topic", topics.Capabilities, + "inbox_topic", topics.Inbox, + "outbox_topic", topics.Outbox) + + // Update connection details + newConnectionDetails := fleet.ConnectionDetails{ + MQTTURL: jwtClaims.MqttURL, + Token: token.AccessToken, + AgentID: jwtClaims.AgentID, + Topics: *topics, + ClientID: fleetManager.config.OrbAgent.ConfigManager.Sources.Fleet.ClientID, + Zone: jwtClaims.Zone, + } + + // Store updated connection details + fleetManager.connectionDetails = newConnectionDetails + + // Reconnect with new token + err = fleetManager.connection.Reconnect(ctx, newConnectionDetails, fleetManager.backends, fleetManager.labels, fleetManager.configYaml, timeout) + if err != nil { + return fmt.Errorf("failed to reconnect: %w", err) + } + + fleetManager.logger.Info("successfully refreshed JWT and reconnected") return nil } diff --git a/agent/configmgr/fleet/auth.go b/agent/configmgr/fleet/auth.go index 0d824b6..2280ea2 100644 --- a/agent/configmgr/fleet/auth.go +++ b/agent/configmgr/fleet/auth.go @@ -16,7 +16,14 @@ import ( // AuthTokenManager manages auth tokens type AuthTokenManager struct { - logger *slog.Logger + logger *slog.Logger + tokenURL string + skipTLS bool + timeout time.Duration + clientID string + clientSecret string + lastToken *TokenResponse + tokenExpiresAt time.Time } // NewAuthTokenManager creates a new AuthTokenManager @@ -46,6 +53,13 @@ func (fleetManager *AuthTokenManager) GetToken(ctx context.Context, tokenURL str return nil, fmt.Errorf("client secret cannot be empty") } + // Store credentials for future refresh + fleetManager.tokenURL = tokenURL + fleetManager.skipTLS = skipTLS + fleetManager.timeout = timeout + fleetManager.clientID = clientID + fleetManager.clientSecret = clientSecret + fleetManager.logger.Debug("requesting access token", "token_url", tokenURL, "client_id", clientID) scopes := []string{ @@ -121,5 +135,29 @@ func (fleetManager *AuthTokenManager) GetToken(ctx context.Context, tokenURL str "expires_in", TokenResponse.ExpiresIn, "mqtt_url", TokenResponse.MQTTURL) + // Store token and calculate expiration time (with 5-minute buffer) + fleetManager.lastToken = &TokenResponse + if TokenResponse.ExpiresIn > 0 { + fleetManager.tokenExpiresAt = time.Now().Add(time.Duration(TokenResponse.ExpiresIn)*time.Second - 5*time.Minute) + } + return &TokenResponse, nil } + +// RefreshToken refreshes the auth token using stored credentials +func (fleetManager *AuthTokenManager) RefreshToken(ctx context.Context) (*TokenResponse, error) { + if fleetManager.tokenURL == "" { + return nil, fmt.Errorf("cannot refresh token: credentials not initialized") + } + + fleetManager.logger.Info("refreshing JWT token") + return fleetManager.GetToken(ctx, fleetManager.tokenURL, fleetManager.skipTLS, fleetManager.timeout, fleetManager.clientID, fleetManager.clientSecret) +} + +// IsTokenExpired checks if the current token is expired or will expire soon +func (fleetManager *AuthTokenManager) IsTokenExpired() bool { + if fleetManager.lastToken == nil { + return true + } + return time.Now().After(fleetManager.tokenExpiresAt) +} diff --git a/agent/configmgr/fleet/connection.go b/agent/configmgr/fleet/connection.go index 6bc4871..3d1a95f 100644 --- a/agent/configmgr/fleet/connection.go +++ b/agent/configmgr/fleet/connection.go @@ -17,17 +17,21 @@ import ( // MQTTConnection manages the MQTT connection type MQTTConnection struct { - logger *slog.Logger - connectionManager *autopaho.ConnectionManager - heartbeater *heartbeater - messaging *Messaging - resetChan chan struct{} - onReadyHooks []func(cm *autopaho.ConnectionManager, topics TokenResponseTopics) - connectionTopics TokenResponseTopics + logger *slog.Logger + connectionManager *autopaho.ConnectionManager + heartbeater *heartbeater + messaging *Messaging + resetChan chan struct{} + onReadyHooks []func(cm *autopaho.ConnectionManager, topics TokenResponseTopics) + connectionTopics TokenResponseTopics + reconnectChan chan struct{} + capabilitiesFailCount int + groupMembershipFailCount int + heartbeatFailCount int } // NewMQTTConnection creates a new MQTTConnection -func NewMQTTConnection(logger *slog.Logger, pMgr policymgr.PolicyManager, resetChan chan struct{}, backendState backend.StateRetriever) *MQTTConnection { +func NewMQTTConnection(logger *slog.Logger, pMgr policymgr.PolicyManager, resetChan chan struct{}, reconnectChan chan struct{}, backendState backend.StateRetriever) *MQTTConnection { groupManager := newGroupManager() return &MQTTConnection{ connectionManager: nil, @@ -36,6 +40,7 @@ func NewMQTTConnection(logger *slog.Logger, pMgr policymgr.PolicyManager, resetC messaging: NewMessaging(logger, pMgr, resetChan, &groupManager), resetChan: resetChan, onReadyHooks: make([]func(cm *autopaho.ConnectionManager, topics TokenResponseTopics), 0), + reconnectChan: reconnectChan, } } @@ -103,7 +108,23 @@ func (connection *MQTTConnection) Connect(ctx context.Context, details Connectio } // start heartbeat loop bound to the same connection-level context - go connection.heartbeater.sendHeartbeats(ctx, func() {}, details.Topics.Heartbeat, details.ClientID, connection.publishToTopic) + go connection.heartbeater.sendHeartbeats(ctx, func() {}, details.Topics.Heartbeat, details.ClientID, connection.publishToTopic, func() { + // Track heartbeat failures + connection.heartbeatFailCount++ + connection.logger.Error("heartbeat publish failed", + "fail_count", connection.heartbeatFailCount) + + // After 5 consecutive failures, trigger reconnect + if connection.heartbeatFailCount >= 5 { + connection.logger.Warn("heartbeat publish failed 5 times, triggering JWT refresh and reconnect") + select { + case connection.reconnectChan <- struct{}{}: + default: + connection.logger.Debug("reconnect already in progress") + } + connection.heartbeatFailCount = 0 + } + }) connection.messaging.sendCapabilities(ctx, backends, labels, configFile, func(ctx context.Context, payload []byte) error { _, err := cm.Publish(ctx, &paho.Publish{ @@ -113,11 +134,26 @@ func (connection *MQTTConnection) Connect(ctx context.Context, details Connectio Retain: false, }) if err != nil { - // TODO: reconnect? - connection.logger.Error("failed to publish capabilities", "error", err) + connection.capabilitiesFailCount++ + connection.logger.Error("failed to publish capabilities", + "error", err, + "fail_count", connection.capabilitiesFailCount) + + // After 1 retry (2 failures), trigger reconnect + if connection.capabilitiesFailCount >= 2 { + connection.logger.Warn("capabilities publish failed twice, triggering JWT refresh and reconnect") + select { + case connection.reconnectChan <- struct{}{}: + default: + connection.logger.Debug("reconnect already in progress") + } + connection.capabilitiesFailCount = 0 + } return err } + // Reset counter on success + connection.capabilitiesFailCount = 0 connection.logger.Debug("capabilities sent", "topic", details.Topics.Capabilities, "payload", string(payload), @@ -135,9 +171,26 @@ func (connection *MQTTConnection) Connect(ctx context.Context, details Connectio Retain: false, }) if err != nil { - connection.logger.Error("failed to publish group memberships request", "error", err) + connection.groupMembershipFailCount++ + connection.logger.Error("failed to publish group memberships request", + "error", err, + "fail_count", connection.groupMembershipFailCount) + + // After 1 retry (2 failures), trigger reconnect + if connection.groupMembershipFailCount >= 2 { + connection.logger.Warn("group membership publish failed twice, triggering JWT refresh and reconnect") + select { + case connection.reconnectChan <- struct{}{}: + default: + connection.logger.Debug("reconnect already in progress") + } + connection.groupMembershipFailCount = 0 + } return err } + + // Reset counter on success + connection.groupMembershipFailCount = 0 return nil }) }, @@ -205,6 +258,37 @@ func (connection *MQTTConnection) Connect(ctx context.Context, details Connectio return nil } +// Reconnect reconnects to the MQTT broker with new connection details (e.g., refreshed JWT) +func (connection *MQTTConnection) Reconnect(ctx context.Context, details ConnectionDetails, backends map[string]backend.Backend, labels map[string]string, configFile string, timeout time.Duration) error { + connection.logger.Info("reconnecting to MQTT broker with refreshed credentials") + + // Disconnect the existing connection + if connection.connectionManager != nil { + disconnectCtx, cancel := context.WithTimeout(ctx, timeout) + connection.heartbeater.stop(details.Topics.Heartbeat, connection.publishToTopic) + err := connection.connectionManager.Disconnect(disconnectCtx) + cancel() + if err != nil { + connection.logger.Error("failed to disconnect during reconnect", "error", err) + // Continue anyway to try to establish new connection + } + } + + // Reset failure counters + connection.capabilitiesFailCount = 0 + connection.groupMembershipFailCount = 0 + connection.heartbeatFailCount = 0 + + // Connect with new details + err := connection.Connect(ctx, details, backends, labels, configFile) + if err != nil { + return fmt.Errorf("failed to connect during reconnect: %w", err) + } + + connection.logger.Info("successfully reconnected to MQTT broker") + return nil +} + // Disconnect disconnects from the MQTT broker func (connection *MQTTConnection) Disconnect(ctx context.Context, heartbeatTopic string) error { connection.heartbeater.stop(heartbeatTopic, connection.publishToTopic) @@ -239,5 +323,8 @@ func (connection *MQTTConnection) publishToTopic(ctx context.Context, topic stri connection.logger.Error("failed to publish to topic", "topic", topic, "error", err) return err } + // Reset heartbeat failure counter on successful publish + // (heartbeats use this function, so successful publish means connection is ok) + connection.heartbeatFailCount = 0 return nil } diff --git a/agent/configmgr/fleet/connection_hooks_test.go b/agent/configmgr/fleet/connection_hooks_test.go index 04baa3d..5733e07 100644 --- a/agent/configmgr/fleet/connection_hooks_test.go +++ b/agent/configmgr/fleet/connection_hooks_test.go @@ -32,7 +32,8 @@ func (noopBackendState) Get() map[string]*backend.State { return map[string]*bac func TestAddOnReadyHook_RegistersHook(t *testing.T) { logger := slog.New(slog.NewTextHandler(os.Stderr, nil)) reset := make(chan struct{}, 1) - conn := NewMQTTConnection(logger, noopPM{}, reset, noopBackendState{}) + reconnect := make(chan struct{}, 1) + conn := NewMQTTConnection(logger, noopPM{}, reset, reconnect, noopBackendState{}) if len(conn.onReadyHooks) != 0 { t.Fatalf("expected 0 hooks initially, got %d", len(conn.onReadyHooks)) @@ -48,7 +49,8 @@ func TestAddOnReadyHook_RegistersHook(t *testing.T) { func TestConnect_StoresTopicsBeforeConnecting(t *testing.T) { logger := slog.New(slog.NewTextHandler(os.Stderr, nil)) reset := make(chan struct{}, 1) - conn := NewMQTTConnection(logger, noopPM{}, reset, noopBackendState{}) + reconnect := make(chan struct{}, 1) + conn := NewMQTTConnection(logger, noopPM{}, reset, reconnect, noopBackendState{}) details := ConnectionDetails{ MQTTURL: "mqtt://localhost:1883", diff --git a/agent/configmgr/fleet/connection_test.go b/agent/configmgr/fleet/connection_test.go index 00d676c..682e20d 100644 --- a/agent/configmgr/fleet/connection_test.go +++ b/agent/configmgr/fleet/connection_test.go @@ -70,7 +70,8 @@ func TestFleetConfigManager_Connect_InvalidURL(t *testing.T) { logger := slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelError})) mockPMgr := &mockPolicyManagerForFleet{} resetChan := make(chan struct{}, 1) - connection := NewMQTTConnection(logger, mockPMgr, resetChan, &mockBackendState{}) + reconnectChan := make(chan struct{}, 1) + connection := NewMQTTConnection(logger, mockPMgr, resetChan, reconnectChan, &mockBackendState{}) // Act with invalid URL backends := make(map[string]backend.Backend) @@ -93,7 +94,8 @@ func TestFleetConfigManager_Connect_ValidURL(t *testing.T) { logger := slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelError})) mockPMgr := &mockPolicyManagerForFleet{} resetChan := make(chan struct{}, 1) - connection := NewMQTTConnection(logger, mockPMgr, resetChan, &mockBackendState{}) + reconnectChan := make(chan struct{}, 1) + connection := NewMQTTConnection(logger, mockPMgr, resetChan, reconnectChan, &mockBackendState{}) // Act with valid URL but don't expect successful connection // since we don't have a real MQTT server diff --git a/agent/configmgr/fleet/heartbeats.go b/agent/configmgr/fleet/heartbeats.go index 15343a0..0c8f60a 100644 --- a/agent/configmgr/fleet/heartbeats.go +++ b/agent/configmgr/fleet/heartbeats.go @@ -37,10 +37,10 @@ func newHeartbeater(logger *slog.Logger, backendState backend.StateRetriever, po func (hb *heartbeater) stop(heartbeatTopic string, publishFunc func(ctx context.Context, topic string, payload []byte) error) { hb.hbTicker.Stop() - hb.sendSingleHeartbeat(hb.heartbeatCtx, heartbeatTopic, publishFunc, "", time.Now(), messages.HeartbeatState(messages.Offline)) + hb.sendSingleHeartbeat(hb.heartbeatCtx, heartbeatTopic, publishFunc, "", time.Now(), messages.HeartbeatState(messages.Offline), nil) } -func (hb *heartbeater) sendSingleHeartbeat(ctx context.Context, heartbeatTopic string, publishFunc func(ctx context.Context, topic string, payload []byte) error, _ string, _ time.Time, _ messages.HeartbeatState) { +func (hb *heartbeater) sendSingleHeartbeat(ctx context.Context, heartbeatTopic string, publishFunc func(ctx context.Context, topic string, payload []byte) error, _ string, _ time.Time, _ messages.HeartbeatState, onFailure func()) { hbData := messages.Heartbeat{ SchemaVersion: messages.CurrentHeartbeatSchemaVersion, TimeStamp: time.Now().UTC(), @@ -58,6 +58,9 @@ func (hb *heartbeater) sendSingleHeartbeat(ctx context.Context, heartbeatTopic s if err := publishFunc(ctx, heartbeatTopic, body); err != nil { hb.logger.Error("error sending heartbeat", "error", err) + if onFailure != nil { + onFailure() + } } else { hb.logger.Debug("heartbeat sent", "payload", string(body)) } @@ -114,23 +117,23 @@ func (hb *heartbeater) getGroupState() map[string]messages.GroupStateInfo { // supplied context is cancelled. The cancelFunc parameter is ignored by the // implementation but is accepted for backward-compatibility with unit tests // that expect to pass it. -func (hb *heartbeater) sendHeartbeats(ctx context.Context, _ context.CancelFunc, heartbeatTopic string, agentID string, publishFunc func(ctx context.Context, topic string, payload []byte) error) { +func (hb *heartbeater) sendHeartbeats(ctx context.Context, _ context.CancelFunc, heartbeatTopic string, agentID string, publishFunc func(ctx context.Context, topic string, payload []byte) error, onFailure func()) { // Update our internal reference so other methods that read hb.heartbeatCtx // (if any) remain accurate. hb.heartbeatCtx = ctx hb.logger.Debug("start heartbeats routine") - hb.sendSingleHeartbeat(ctx, heartbeatTopic, publishFunc, agentID, time.Now(), messages.Online) + hb.sendSingleHeartbeat(ctx, heartbeatTopic, publishFunc, agentID, time.Now(), messages.Online, onFailure) for { select { case <-ctx.Done(): hb.logger.Debug("context done, stopping heartbeats routine") - hb.sendSingleHeartbeat(ctx, heartbeatTopic, publishFunc, agentID, time.Now(), messages.Offline) + hb.sendSingleHeartbeat(ctx, heartbeatTopic, publishFunc, agentID, time.Now(), messages.Offline, nil) hb.heartbeatCtx = nil return case t := <-hb.hbTicker.C: - hb.sendSingleHeartbeat(ctx, heartbeatTopic, publishFunc, agentID, t, messages.Online) + hb.sendSingleHeartbeat(ctx, heartbeatTopic, publishFunc, agentID, t, messages.Online, onFailure) } } } diff --git a/agent/configmgr/fleet/heartbeats_test.go b/agent/configmgr/fleet/heartbeats_test.go index 6ca2469..7b81e38 100644 --- a/agent/configmgr/fleet/heartbeats_test.go +++ b/agent/configmgr/fleet/heartbeats_test.go @@ -142,7 +142,7 @@ func TestHeartbeater_SendSingleHeartbeat_Success(t *testing.T) { mockPublish.On("Publish", ctx, testTopic, mock.AnythingOfType("[]uint8")).Return(nil) // Act - hb.sendSingleHeartbeat(ctx, testTopic, mockPublish.Publish, "test-agent-id", testTime, messages.Online) + hb.sendSingleHeartbeat(ctx, testTopic, mockPublish.Publish, "test-agent-id", testTime, messages.Online, nil) // Assert: ensure one publish happened with a valid heartbeat payload calls := mockPublish.Calls @@ -172,7 +172,7 @@ func TestHeartbeater_SendSingleHeartbeat_PublishError(t *testing.T) { mockPublish.On("Publish", ctx, testTopic, mock.AnythingOfType("[]uint8")).Return(publishError) // Act - should not panic despite publish error - hb.sendSingleHeartbeat(ctx, testTopic, mockPublish.Publish, "test-agent-id", testTime, messages.Online) + hb.sendSingleHeartbeat(ctx, testTopic, mockPublish.Publish, "test-agent-id", testTime, messages.Online, nil) // Assert mockPublish.AssertExpectations(t) @@ -194,7 +194,7 @@ func TestHeartbeater_SendSingleHeartbeat_HeartbeatContent(t *testing.T) { testTime := time.Now() // Act - hb.sendSingleHeartbeat(ctx, testTopic, publishFunc, "test-agent-id", testTime, messages.Online) + hb.sendSingleHeartbeat(ctx, testTopic, publishFunc, "test-agent-id", testTime, messages.Online, nil) // Assert require.NotNil(t, capturedPayload) @@ -225,7 +225,7 @@ func TestHeartbeater_SendHeartbeats_InitialHeartbeat(t *testing.T) { mockPublish.On("Publish", ctx, testTopic, mock.AnythingOfType("[]uint8")).Return(nil).Once() // Act - go hb.sendHeartbeats(ctx, cancel, testTopic, "test-agent-id", mockPublish.Publish) + go hb.sendHeartbeats(ctx, cancel, testTopic, "test-agent-id", mockPublish.Publish, nil) // Give some time for initial heartbeat time.Sleep(10 * time.Millisecond) @@ -254,7 +254,7 @@ func TestHeartbeater_SendHeartbeats_PeriodicHeartbeats(t *testing.T) { mockPublish.On("Publish", ctx, testTopic, mock.AnythingOfType("[]uint8")).Return(nil).Times(4) // Act - go hb.sendHeartbeats(ctx, cancel, testTopic, "test-agent-id", mockPublish.Publish) + go hb.sendHeartbeats(ctx, cancel, testTopic, "test-agent-id", mockPublish.Publish, nil) // Wait for some periodic heartbeats (ticker is 50ms in test) time.Sleep(120 * time.Millisecond) @@ -291,7 +291,7 @@ func TestHeartbeater_SendHeartbeats_ContextCancellation(t *testing.T) { // Act go func() { - hb.sendHeartbeats(ctx, cancel, testTopic, "test-agent-id", publishFunc) + hb.sendHeartbeats(ctx, cancel, testTopic, "test-agent-id", publishFunc, nil) done <- true }() @@ -328,7 +328,7 @@ func TestHeartbeater_SendHeartbeats_PublishErrors(t *testing.T) { mockPublish.On("Publish", ctx, testTopic, mock.AnythingOfType("[]uint8")).Return(publishError).Times(4) // Act - go hb.sendHeartbeats(ctx, cancel, testTopic, "test-agent-id", mockPublish.Publish) + go hb.sendHeartbeats(ctx, cancel, testTopic, "test-agent-id", mockPublish.Publish, nil) // Wait for some heartbeats with errors time.Sleep(120 * time.Millisecond) @@ -357,7 +357,7 @@ func TestHeartbeater_SendHeartbeats_ConcurrentCancellation(t *testing.T) { mockPublish.On("Publish", ctx, testTopic, mock.AnythingOfType("[]uint8")).Return(nil).Maybe() // Act - start heartbeats - go hb.sendHeartbeats(ctx, cancel, testTopic, "test-agent-id", mockPublish.Publish) + go hb.sendHeartbeats(ctx, cancel, testTopic, "test-agent-id", mockPublish.Publish, nil) // Cancel immediately in a separate goroutine go func() { @@ -402,7 +402,7 @@ func TestHeartbeater_SendHeartbeats_HeartbeatStates(t *testing.T) { // Act go func() { - hb.sendHeartbeats(ctx, cancel, testTopic, "test-agent-id", publishFunc) + hb.sendHeartbeats(ctx, cancel, testTopic, "test-agent-id", publishFunc, nil) done <- true }() @@ -557,7 +557,7 @@ func TestHeartbeater_SendSingleHeartbeat_WithBackendState(t *testing.T) { ctx := context.Background() // Act - hb.sendSingleHeartbeat(ctx, testTopic, publishFunc, "test-agent-id", testTime, messages.Online) + hb.sendSingleHeartbeat(ctx, testTopic, publishFunc, "test-agent-id", testTime, messages.Online, nil) // Assert require.NotNil(t, capturedPayload) @@ -611,7 +611,7 @@ func TestHeartbeater_SendSingleHeartbeat_WithoutBackendState(t *testing.T) { testTime := time.Now() // Act - hb.sendSingleHeartbeat(ctx, testTopic, publishFunc, "test-agent-id", testTime, messages.Online) + hb.sendSingleHeartbeat(ctx, testTopic, publishFunc, "test-agent-id", testTime, messages.Online, nil) // Assert require.NotNil(t, capturedPayload) @@ -641,7 +641,7 @@ func TestHeartbeater_SendSingleHeartbeat_WithEmptyBackendState(t *testing.T) { testTime := time.Now() // Act - hb.sendSingleHeartbeat(ctx, testTopic, publishFunc, "test-agent-id", testTime, messages.Online) + hb.sendSingleHeartbeat(ctx, testTopic, publishFunc, "test-agent-id", testTime, messages.Online, nil) // Assert require.NotNil(t, capturedPayload) @@ -696,7 +696,7 @@ func TestHeartbeater_SendSingleHeartbeat_BackendStateAllStatuses(t *testing.T) { testTime := time.Now() // Act - hb.sendSingleHeartbeat(ctx, testTopic, publishFunc, "test-agent-id", testTime, messages.Online) + hb.sendSingleHeartbeat(ctx, testTopic, publishFunc, "test-agent-id", testTime, messages.Online, nil) // Assert require.NotNil(t, capturedPayload) @@ -897,7 +897,7 @@ func TestHeartbeater_SendSingleHeartbeat_WithPolicyState(t *testing.T) { ctx := context.Background() // Act - hb.sendSingleHeartbeat(ctx, testTopic, publishFunc, "test-agent-id", testTime, messages.Online) + hb.sendSingleHeartbeat(ctx, testTopic, publishFunc, "test-agent-id", testTime, messages.Online, nil) // Assert require.NotNil(t, capturedPayload) @@ -959,7 +959,7 @@ func TestHeartbeater_SendSingleHeartbeat_WithPolicyStateError(t *testing.T) { testTime := time.Now() // Act - should not panic despite policy state error - hb.sendSingleHeartbeat(ctx, testTopic, publishFunc, "test-agent-id", testTime, messages.Online) + hb.sendSingleHeartbeat(ctx, testTopic, publishFunc, "test-agent-id", testTime, messages.Online, nil) // Assert require.NotNil(t, capturedPayload) @@ -994,7 +994,7 @@ func TestHeartbeater_SendSingleHeartbeat_WithEmptyPolicyState(t *testing.T) { testTime := time.Now() // Act - hb.sendSingleHeartbeat(ctx, testTopic, publishFunc, "test-agent-id", testTime, messages.Online) + hb.sendSingleHeartbeat(ctx, testTopic, publishFunc, "test-agent-id", testTime, messages.Online, nil) // Assert require.NotNil(t, capturedPayload) @@ -1065,7 +1065,7 @@ func TestHeartbeater_SendSingleHeartbeat_WithEmptyGroupState(t *testing.T) { testTime := time.Now() // Act - hb.sendSingleHeartbeat(ctx, testTopic, publishFunc, "test-agent-id", testTime, messages.Online) + hb.sendSingleHeartbeat(ctx, testTopic, publishFunc, "test-agent-id", testTime, messages.Online, nil) // Assert require.NotNil(t, capturedPayload) @@ -1104,7 +1104,7 @@ func TestHeartbeater_SendSingleHeartbeat_WithGroupState(t *testing.T) { testTime := time.Now() // Act - hb.sendSingleHeartbeat(ctx, testTopic, publishFunc, "test-agent-id", testTime, messages.Online) + hb.sendSingleHeartbeat(ctx, testTopic, publishFunc, "test-agent-id", testTime, messages.Online, nil) // Assert require.NotNil(t, capturedPayload) @@ -1188,7 +1188,7 @@ func TestHeartbeater_SendSingleHeartbeat_WithCompleteState(t *testing.T) { ctx := context.Background() // Act - hb.sendSingleHeartbeat(ctx, testTopic, publishFunc, "test-agent-id", testTime, messages.Online) + hb.sendSingleHeartbeat(ctx, testTopic, publishFunc, "test-agent-id", testTime, messages.Online, nil) // Assert require.NotNil(t, capturedPayload) @@ -1248,7 +1248,7 @@ func TestHeartbeater_SendSingleHeartbeat_GroupStateAfterRemoval(t *testing.T) { testTime := time.Now() // Send initial heartbeat with 2 groups - hb.sendSingleHeartbeat(ctx, testTopic, publishFunc, "test-agent-id", testTime, messages.Online) + hb.sendSingleHeartbeat(ctx, testTopic, publishFunc, "test-agent-id", testTime, messages.Online, nil) require.NotNil(t, capturedPayload) var heartbeat1 messages.Heartbeat @@ -1261,7 +1261,7 @@ func TestHeartbeater_SendSingleHeartbeat_GroupStateAfterRemoval(t *testing.T) { // Send second heartbeat after removal capturedPayload = nil - hb.sendSingleHeartbeat(ctx, testTopic, publishFunc, "test-agent-id", testTime, messages.Online) + hb.sendSingleHeartbeat(ctx, testTopic, publishFunc, "test-agent-id", testTime, messages.Online, nil) require.NotNil(t, capturedPayload) var heartbeat2 messages.Heartbeat @@ -1302,7 +1302,7 @@ func TestHeartbeater_SendSingleHeartbeat_GroupStateAfterRemoveAll(t *testing.T) testTime := time.Now() // Send initial heartbeat with 2 groups - hb.sendSingleHeartbeat(ctx, testTopic, publishFunc, "test-agent-id", testTime, messages.Online) + hb.sendSingleHeartbeat(ctx, testTopic, publishFunc, "test-agent-id", testTime, messages.Online, nil) require.NotNil(t, capturedPayload) var heartbeat1 messages.Heartbeat @@ -1315,7 +1315,7 @@ func TestHeartbeater_SendSingleHeartbeat_GroupStateAfterRemoveAll(t *testing.T) // Send second heartbeat after removal capturedPayload = nil - hb.sendSingleHeartbeat(ctx, testTopic, publishFunc, "test-agent-id", testTime, messages.Online) + hb.sendSingleHeartbeat(ctx, testTopic, publishFunc, "test-agent-id", testTime, messages.Online, nil) require.NotNil(t, capturedPayload) var heartbeat2 messages.Heartbeat @@ -1345,7 +1345,7 @@ func TestHeartbeater_SendSingleHeartbeat_DynamicGroupUpdates(t *testing.T) { testTime := time.Now() // Heartbeat 1: No groups - hb.sendSingleHeartbeat(ctx, testTopic, publishFunc, "test-agent-id", testTime, messages.Online) + hb.sendSingleHeartbeat(ctx, testTopic, publishFunc, "test-agent-id", testTime, messages.Online, nil) var hb1 messages.Heartbeat require.NoError(t, json.Unmarshal(capturedPayload, &hb1)) assert.Empty(t, hb1.GroupState) @@ -1354,7 +1354,7 @@ func TestHeartbeater_SendSingleHeartbeat_DynamicGroupUpdates(t *testing.T) { gm.Add(messages.GroupMembershipData{GroupID: "group-1", Name: "Group 1"}) // Heartbeat 2: 1 group - hb.sendSingleHeartbeat(ctx, testTopic, publishFunc, "test-agent-id", testTime, messages.Online) + hb.sendSingleHeartbeat(ctx, testTopic, publishFunc, "test-agent-id", testTime, messages.Online, nil) var hb2 messages.Heartbeat require.NoError(t, json.Unmarshal(capturedPayload, &hb2)) assert.Len(t, hb2.GroupState, 1) @@ -1363,7 +1363,7 @@ func TestHeartbeater_SendSingleHeartbeat_DynamicGroupUpdates(t *testing.T) { gm.Add(messages.GroupMembershipData{GroupID: "group-2", Name: "Group 2"}) // Heartbeat 3: 2 groups - hb.sendSingleHeartbeat(ctx, testTopic, publishFunc, "test-agent-id", testTime, messages.Online) + hb.sendSingleHeartbeat(ctx, testTopic, publishFunc, "test-agent-id", testTime, messages.Online, nil) var hb3 messages.Heartbeat require.NoError(t, json.Unmarshal(capturedPayload, &hb3)) assert.Len(t, hb3.GroupState, 2) @@ -1372,7 +1372,7 @@ func TestHeartbeater_SendSingleHeartbeat_DynamicGroupUpdates(t *testing.T) { gm.Remove("group-1") // Heartbeat 4: 1 group - hb.sendSingleHeartbeat(ctx, testTopic, publishFunc, "test-agent-id", testTime, messages.Online) + hb.sendSingleHeartbeat(ctx, testTopic, publishFunc, "test-agent-id", testTime, messages.Online, nil) var hb4 messages.Heartbeat require.NoError(t, json.Unmarshal(capturedPayload, &hb4)) assert.Len(t, hb4.GroupState, 1) diff --git a/go.mod b/go.mod index bd1eae5..290e324 100644 --- a/go.mod +++ b/go.mod @@ -1,6 +1,6 @@ module github.com/netboxlabs/orb-agent -go 1.24.4 +go 1.24.10 require ( github.com/eclipse/paho.golang v0.22.0 @@ -22,6 +22,9 @@ require ( go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploggrpc v0.14.0 go.opentelemetry.io/otel/sdk v1.38.0 go.opentelemetry.io/otel/sdk/log v0.14.0 + go.opentelemetry.io/proto/otlp v1.7.1 + google.golang.org/grpc v1.75.0 + google.golang.org/protobuf v1.36.8 gopkg.in/yaml.v3 v3.0.1 ) @@ -33,11 +36,13 @@ require ( github.com/cenkalti/backoff/v4 v4.3.0 // indirect github.com/cenkalti/backoff/v5 v5.0.3 // indirect github.com/cloudflare/circl v1.6.1 // indirect + github.com/containerd/errdefs v1.0.0 // indirect + github.com/containerd/errdefs/pkg v0.3.0 // indirect github.com/containerd/log v0.1.0 // indirect github.com/cyphar/filepath-securejoin v0.4.1 // indirect github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect github.com/distribution/reference v0.6.0 // indirect - github.com/docker/docker v28.0.0+incompatible // indirect + github.com/docker/docker v28.3.3+incompatible // indirect github.com/docker/go-connections v0.5.0 // indirect github.com/docker/go-units v0.5.0 // indirect github.com/emirpasic/gods v1.18.1 // indirect @@ -67,15 +72,17 @@ require ( github.com/jbenet/go-context v0.0.0-20150711004518-d14ea06fba99 // indirect github.com/jonboulle/clockwork v0.4.0 // indirect github.com/kevinburke/ssh_config v1.2.0 // indirect - github.com/klauspost/compress v1.17.11 // indirect + github.com/klauspost/compress v1.18.0 // indirect github.com/mattn/go-colorable v0.1.14 // indirect github.com/mattn/go-isatty v0.0.20 // indirect github.com/mitchellh/go-homedir v1.1.0 // indirect github.com/mitchellh/mapstructure v1.5.1-0.20220423185008-bf980b35cac4 // indirect github.com/moby/docker-image-spec v1.3.1 // indirect - github.com/moby/patternmatcher v0.5.0 // indirect - github.com/moby/sys/sequential v0.5.0 // indirect - github.com/moby/sys/user v0.3.0 // indirect + github.com/moby/go-archive v0.1.0 // indirect + github.com/moby/patternmatcher v0.6.0 // indirect + github.com/moby/sys/atomicwriter v0.1.0 // indirect + github.com/moby/sys/sequential v0.6.0 // indirect + github.com/moby/sys/user v0.4.0 // indirect github.com/moby/sys/userns v0.1.0 // indirect github.com/moby/term v0.5.2 // indirect github.com/morikuni/aec v1.0.0 // indirect @@ -100,7 +107,6 @@ require ( go.opentelemetry.io/otel/log v0.14.0 // indirect go.opentelemetry.io/otel/metric v1.38.0 // indirect go.opentelemetry.io/otel/trace v1.38.0 // indirect - go.opentelemetry.io/proto/otlp v1.7.1 // indirect go.uber.org/atomic v1.11.0 // indirect golang.org/x/crypto v0.41.0 // indirect golang.org/x/exp v0.0.0-20250531010427-b6e5de432a8b // indirect @@ -110,8 +116,5 @@ require ( golang.org/x/time v0.11.0 // indirect google.golang.org/genproto/googleapis/api v0.0.0-20250825161204-c5933d9347a5 // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20250825161204-c5933d9347a5 // indirect - google.golang.org/grpc v1.75.0 // indirect - google.golang.org/protobuf v1.36.8 // indirect gopkg.in/warnings.v0 v0.1.2 // indirect - gotest.tools/v3 v3.5.2 // indirect ) diff --git a/go.sum b/go.sum index bcf4ace..896f96c 100644 --- a/go.sum +++ b/go.sum @@ -1,7 +1,7 @@ dario.cat/mergo v1.0.1 h1:Ra4+bf83h2ztPIQYNP99R6m+Y7KfnARDfID+a+vLl4s= dario.cat/mergo v1.0.1/go.mod h1:uNxQE+84aUszobStD9th8a29P2fMDhsBdgRYvZOxGmk= -github.com/AdaLogics/go-fuzz-headers v0.0.0-20230811130428-ced1acdcaa24 h1:bvDV9vkmnHYOMsOr4WLk+Vo07yKIzd94sVoIqshQ4bU= -github.com/AdaLogics/go-fuzz-headers v0.0.0-20230811130428-ced1acdcaa24/go.mod h1:8o94RPi1/7XTJvwPpRSzSUedZrtlirdB3r9Z20bi2f8= +github.com/AdaLogics/go-fuzz-headers v0.0.0-20240806141605-e8a1dd7889d6 h1:He8afgbRMd7mFxO99hRNu+6tazq8nFF9lIwo9JFroBk= +github.com/AdaLogics/go-fuzz-headers v0.0.0-20240806141605-e8a1dd7889d6/go.mod h1:8o94RPi1/7XTJvwPpRSzSUedZrtlirdB3r9Z20bi2f8= github.com/Azure/go-ansiterm v0.0.0-20250102033503-faa5f7b0171c h1:udKWzYgxTojEKWjV8V+WSxDXJ4NFATAsZjh8iIbsQIg= github.com/Azure/go-ansiterm v0.0.0-20250102033503-faa5f7b0171c/go.mod h1:xomTg63KZ2rFqZQzSB4Vz2SUXa1BpHTVz9L5PTmPC4E= github.com/Microsoft/go-winio v0.5.2/go.mod h1:WpS1mjBmmwHBEWmogvA2mj8546UReBk4v8QkMxJ6pZY= @@ -21,6 +21,10 @@ github.com/cenkalti/backoff/v5 v5.0.3 h1:ZN+IMa753KfX5hd8vVaMixjnqRZ3y8CuJKRKj1x github.com/cenkalti/backoff/v5 v5.0.3/go.mod h1:rkhZdG3JZukswDf7f0cwqPNk4K0sa+F97BxZthm/crw= github.com/cloudflare/circl v1.6.1 h1:zqIqSPIndyBh1bjLVVDHMPpVKqp8Su/V+6MeDzzQBQ0= github.com/cloudflare/circl v1.6.1/go.mod h1:uddAzsPgqdMAYatqJ0lsjX1oECcQLIlRpzZh3pJrofs= +github.com/containerd/errdefs v1.0.0 h1:tg5yIfIlQIrxYtu9ajqY42W3lpS19XqdxRQeEwYG8PI= +github.com/containerd/errdefs v1.0.0/go.mod h1:+YBYIdtsnF4Iw6nWZhJcqGSg/dwvV7tyJ/kCkyJ2k+M= +github.com/containerd/errdefs/pkg v0.3.0 h1:9IKJ06FvyNlexW690DXuQNx2KA2cUJXx151Xdx3ZPPE= +github.com/containerd/errdefs/pkg v0.3.0/go.mod h1:NJw6s9HwNuRhnjJhM7pylWwMyAkmCQvQ4GpJHEqRLVk= github.com/containerd/log v0.1.0 h1:TCJt7ioM2cr/tfR8GPbGf9/VRAX8D2B4PjzCpfX540I= github.com/containerd/log v0.1.0/go.mod h1:VRRf09a7mHDIRezVKTRCrOq78v577GXq3bSa3EhrzVo= github.com/cpuguy83/go-md2man/v2 v2.0.4/go.mod h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46tRHOmNcaadrF8o= @@ -32,8 +36,8 @@ github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc h1:U9qPSI2PIWSS1 github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/distribution/reference v0.6.0 h1:0IXCQ5g4/QMHHkarYzh5l+u8T3t73zM5QvfrDyIgxBk= github.com/distribution/reference v0.6.0/go.mod h1:BbU0aIcezP1/5jX/8MP0YiH4SdvB5Y4f/wlDRiLyi3E= -github.com/docker/docker v28.0.0+incompatible h1:Olh0KS820sJ7nPsBKChVhk5pzqcwDR15fumfAd/p9hM= -github.com/docker/docker v28.0.0+incompatible/go.mod h1:eEKB0N0r5NX/I1kEveEz05bcu8tLC/8azJZsviup8Sk= +github.com/docker/docker v28.3.3+incompatible h1:Dypm25kh4rmk49v1eiVbsAtpAsYURjYkaKubwuBdxEI= +github.com/docker/docker v28.3.3+incompatible/go.mod h1:eEKB0N0r5NX/I1kEveEz05bcu8tLC/8azJZsviup8Sk= github.com/docker/go-connections v0.5.0 h1:USnMq7hx7gwdVZq1L49hLXaFtUdTADjXGp+uj1Br63c= github.com/docker/go-connections v0.5.0/go.mod h1:ov60Kzw0kKElRwhNs9UlUHAE/F9Fe6GLaXnqyDdmEXc= github.com/docker/go-units v0.5.0 h1:69rxXcBk27SvSaaxTtLh/8llcHD8vYHT7WSdRZ/jvr4= @@ -135,8 +139,8 @@ github.com/kevinburke/ssh_config v1.2.0 h1:x584FjTGwHzMwvHx18PXxbBVzfnxogHaAReU4 github.com/kevinburke/ssh_config v1.2.0/go.mod h1:CT57kijsi8u/K/BOFA39wgDQJ9CxiF4nAY/ojJ6r6mM= github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8= github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= -github.com/klauspost/compress v1.17.11 h1:In6xLpyWOi1+C7tXUUWv2ot1QvBjxevKAaI6IXrJmUc= -github.com/klauspost/compress v1.17.11/go.mod h1:pMDklpSncoRMuLFrf1W9Ss9KT+0rH90U12bZKk7uwG0= +github.com/klauspost/compress v1.18.0 h1:c/Cqfb0r+Yi+JtIEq73FWXVkRonBlf0CRNYc8Zttxdo= +github.com/klauspost/compress v1.18.0/go.mod h1:2Pp+KzxcywXVXMr50+X0Q/Lsb43OQHYWRCY2AiWywWQ= github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk= @@ -158,12 +162,16 @@ github.com/mitchellh/mapstructure v1.5.1-0.20220423185008-bf980b35cac4 h1:BpfhmL github.com/mitchellh/mapstructure v1.5.1-0.20220423185008-bf980b35cac4/go.mod h1:bFUtVrKA4DC2yAKiSyO/QUcy7e+RRV2QTWOzhPopBRo= github.com/moby/docker-image-spec v1.3.1 h1:jMKff3w6PgbfSa69GfNg+zN/XLhfXJGnEx3Nl2EsFP0= github.com/moby/docker-image-spec v1.3.1/go.mod h1:eKmb5VW8vQEh/BAr2yvVNvuiJuY6UIocYsFu/DxxRpo= -github.com/moby/patternmatcher v0.5.0 h1:YCZgJOeULcxLw1Q+sVR636pmS7sPEn1Qo2iAN6M7DBo= -github.com/moby/patternmatcher v0.5.0/go.mod h1:hDPoyOpDY7OrrMDLaYoY3hf52gNCR/YOUYxkhApJIxc= -github.com/moby/sys/sequential v0.5.0 h1:OPvI35Lzn9K04PBbCLW0g4LcFAJgHsvXsRyewg5lXtc= -github.com/moby/sys/sequential v0.5.0/go.mod h1:tH2cOOs5V9MlPiXcQzRC+eEyab644PWKGRYaaV5ZZlo= -github.com/moby/sys/user v0.3.0 h1:9ni5DlcW5an3SvRSx4MouotOygvzaXbaSrc/wGDFWPo= -github.com/moby/sys/user v0.3.0/go.mod h1:bG+tYYYJgaMtRKgEmuueC0hJEAZWwtIbZTB+85uoHjs= +github.com/moby/go-archive v0.1.0 h1:Kk/5rdW/g+H8NHdJW2gsXyZ7UnzvJNOy6VKJqueWdcQ= +github.com/moby/go-archive v0.1.0/go.mod h1:G9B+YoujNohJmrIYFBpSd54GTUB4lt9S+xVQvsJyFuo= +github.com/moby/patternmatcher v0.6.0 h1:GmP9lR19aU5GqSSFko+5pRqHi+Ohk1O69aFiKkVGiPk= +github.com/moby/patternmatcher v0.6.0/go.mod h1:hDPoyOpDY7OrrMDLaYoY3hf52gNCR/YOUYxkhApJIxc= +github.com/moby/sys/atomicwriter v0.1.0 h1:kw5D/EqkBwsBFi0ss9v1VG3wIkVhzGvLklJ+w3A14Sw= +github.com/moby/sys/atomicwriter v0.1.0/go.mod h1:Ul8oqv2ZMNHOceF643P6FKPXeCmYtlQMvpizfsSoaWs= +github.com/moby/sys/sequential v0.6.0 h1:qrx7XFUd/5DxtqcoH1h438hF5TmOvzC/lspjy7zgvCU= +github.com/moby/sys/sequential v0.6.0/go.mod h1:uyv8EUTrca5PnDsdMGXhZe6CCe8U/UiTWd+lL+7b/Ko= +github.com/moby/sys/user v0.4.0 h1:jhcMKit7SA80hivmFJcbB1vqmw//wU61Zdui2eQXuMs= +github.com/moby/sys/user v0.4.0/go.mod h1:bG+tYYYJgaMtRKgEmuueC0hJEAZWwtIbZTB+85uoHjs= github.com/moby/sys/userns v0.1.0 h1:tVLXkFOxVu9A64/yh59slHVv9ahO9UIev4JZusOLG/g= github.com/moby/sys/userns v0.1.0/go.mod h1:IHUYgu/kao6N8YZlp9Cf444ySSvCmDlmzUcYfDHOl28= github.com/moby/term v0.5.2 h1:6qk3FJAFDs6i/q3W/pQ97SX192qKfZgGjCQqfCJkgzQ=