From fddbd5a6ab7938ca712923e1a7fab476c29e6e91 Mon Sep 17 00:00:00 2001 From: Dejan K Date: Wed, 17 Sep 2025 11:10:01 +0200 Subject: [PATCH 1/2] fix(self-hosted-hub): fix agent cleanup logic and handle job failures during cleanup --- self_hosted_hub/cmd/server/main.go | 42 ++++++---- self_hosted_hub/pkg/agentsync/protocol.go | 7 +- self_hosted_hub/pkg/amqp/publisher.go | 18 +++++ .../pkg/workers/agentcleaner/worker.go | 80 ++++++++++++++++--- .../pkg/workers/agentcleaner/worker_test.go | 14 ++-- .../pkg/workers/disconnectedcleaner/worker.go | 77 +++++++++++++----- .../disconnectedcleaner/worker_test.go | 8 +- 7 files changed, 186 insertions(+), 60 deletions(-) diff --git a/self_hosted_hub/cmd/server/main.go b/self_hosted_hub/cmd/server/main.go index f36870f2d..4373529bb 100644 --- a/self_hosted_hub/cmd/server/main.go +++ b/self_hosted_hub/cmd/server/main.go @@ -31,11 +31,6 @@ func startPublicAPI() { panic("Public API port can't be empty") } - rabbitURL := os.Getenv("RABBITMQ_URL") - if rabbitURL == "" { - panic("Public API needs RABBITMQ_URL") - } - provider, err := configureFeatureProvider() if err != nil { panic(err) @@ -52,10 +47,7 @@ func startPublicAPI() { log.Fatalf("Error creating agent counter: %v", err) } - publisher, err := amqp.NewPublisher(rabbitURL) - if err != nil { - log.Fatalf("Error creating AMQP publisher: %v", err) - } + publisher := createPublisher() server, err := publicapi.NewServer(quotaClient, agentCounter, publisher) if err != nil { @@ -83,14 +75,20 @@ func startInternalAPI() { internalapi.RunServer(50051, quotaClient) } -func startAgentCleaner() { +func startAgentCleaners() { + publisher := createPublisher() + go startAgentCleaner(publisher) + go startDisconnectedAgentCleaner(publisher) +} + +func startAgentCleaner(publisher *amqp.Publisher) { log.Println("Starting Agent Cleaner") - agentcleaner.Start() + agentcleaner.Start(publisher) } -func startDisconnectedAgentCleaner() { +func startDisconnectedAgentCleaner(publisher *amqp.Publisher) { log.Println("Starting Disconnected Agent Cleaner") - disconnected_cleaner.Start() + disconnected_cleaner.Start(publisher) } func startMetricsCollector() { @@ -176,8 +174,7 @@ func main() { } if os.Getenv("START_AGENT_CLEANER") == "yes" { - go startAgentCleaner() - go startDisconnectedAgentCleaner() + startAgentCleaners() } if os.Getenv("START_METRICS_COLLECTOR") == "yes" { @@ -188,3 +185,18 @@ func main() { select {} } + +// Creates a publisher for AMQP +// Panics if RABBITMQ_URL is not set or if there is an error creating the publisher +func createPublisher() *amqp.Publisher { + rabbitURL := os.Getenv("RABBITMQ_URL") + if rabbitURL == "" { + panic("RABBITMQ_URL is required to run the service") + } + + publisher, err := amqp.NewPublisher(rabbitURL) + if err != nil { + log.Fatalf("Error creating AMQP publisher: %v", err) + } + return publisher +} diff --git a/self_hosted_hub/pkg/agentsync/protocol.go b/self_hosted_hub/pkg/agentsync/protocol.go index 73c5b617e..18e41152c 100644 --- a/self_hosted_hub/pkg/agentsync/protocol.go +++ b/self_hosted_hub/pkg/agentsync/protocol.go @@ -257,12 +257,7 @@ func handleFinishedJobState(ctx context.Context, publisher *amqp.Publisher, agen * it means the agent was using callbacks, so we don't send them again here. */ if result != "" { - err = publisher.PublishFinishedCallback(ctx, jobID, string(result)) - if err != nil { - return nil, err - } - - err = publisher.PublishTeardownFinishedCallback(ctx, jobID) + err = publisher.HandleJobFinished(ctx, jobID, string(result)) if err != nil { return nil, err } diff --git a/self_hosted_hub/pkg/amqp/publisher.go b/self_hosted_hub/pkg/amqp/publisher.go index e096b4401..45452c0cb 100644 --- a/self_hosted_hub/pkg/amqp/publisher.go +++ b/self_hosted_hub/pkg/amqp/publisher.go @@ -95,6 +95,24 @@ func (p *Publisher) PublishTeardownFinishedCallback(ctx context.Context, jobID s }) } +/* + * This function is used to publish the job finished and teardown finished callbacks. + * It is used by the job-finished sync request and agent cleaners. + */ +func (p *Publisher) HandleJobFinished(ctx context.Context, jobID string, result string) error { + err := p.PublishFinishedCallback(ctx, jobID, result) + if err != nil { + return err + } + + err = p.PublishTeardownFinishedCallback(ctx, jobID) + if err != nil { + return err + } + + return nil +} + func buildJobFinishedMessage(jobId, result string) ([]byte, error) { /* diff --git a/self_hosted_hub/pkg/workers/agentcleaner/worker.go b/self_hosted_hub/pkg/workers/agentcleaner/worker.go index a4da907db..27a42cb2e 100644 --- a/self_hosted_hub/pkg/workers/agentcleaner/worker.go +++ b/self_hosted_hub/pkg/workers/agentcleaner/worker.go @@ -1,44 +1,100 @@ package agentcleaner import ( + "context" "os" "time" log "github.com/sirupsen/logrus" + "github.com/semaphoreio/semaphore/self_hosted_hub/pkg/amqp" database "github.com/semaphoreio/semaphore/self_hosted_hub/pkg/database" models "github.com/semaphoreio/semaphore/self_hosted_hub/pkg/models" "gorm.io/gorm" ) const CleanerName = "self-hosted-agents-cleaner" +const batchSize = 100 -func Start() { +func Start(publisher *amqp.Publisher) { initialDelay() for { - Tick() + Tick(publisher) time.Sleep(1 * time.Minute) } } -func Tick() { +func Tick(publisher *amqp.Publisher) { // The advisory lock makes sure that only one cleaner is working at a time - _ = database.WithAdvisoryLock(CleanerName, deleteStuckAgents) + _ = database.WithAdvisoryLock(CleanerName, func(db *gorm.DB) error { + return deleteStuckAgents(db, publisher) + }) } -func deleteStuckAgents(db *gorm.DB) error { +func deleteStuckAgents(db *gorm.DB, publisher *amqp.Publisher) error { oneMinAgo := time.Now().Add(-1 * time.Minute) threeMinsAgo := time.Now().Add(-3 * time.Minute) fifteenMinsAgo := time.Now().Add(-15 * time.Minute) - err := db.Where("last_sync_at IS NULL AND created_at < ?", oneMinAgo). - Or(db.Where("last_sync_at < ? AND assigned_job_id IS NULL", threeMinsAgo)). - Or(db.Where("last_sync_at < ?", fifteenMinsAgo)). - Delete(models.Agent{}). - Error + // Find all agents that should be cleaned up in a single query + type AgentInfo struct { + ID string `gorm:"column:id"` + AssignedJobID *string `gorm:"column:assigned_job_id"` + } + + var agents []AgentInfo + err := db.Model(&models.Agent{}). + Select("id, assigned_job_id::text as assigned_job_id"). + Where("(last_sync_at IS NULL AND created_at < ?)", oneMinAgo). + Or("(last_sync_at < ? AND assigned_job_id IS NULL)", threeMinsAgo). + Or("(last_sync_at < ?)", fifteenMinsAgo). + Limit(batchSize). + Scan(&agents).Error + + if err != nil { + log.Printf("[%s] Error while querying agents for cleanup: %s", CleanerName, err.Error()) + return err + } + + if len(agents) == 0 { + log.Printf("[%s] No agents to clean up", CleanerName) + return nil + } + + log.Printf("[%s] Found %d agents to clean up", CleanerName, len(agents)) + + // Process agents and collect IDs to delete + var idsToDelete []string + ctx := context.Background() + + for _, agent := range agents { + // For agents with assigned jobs, handle job finalization first + if agent.AssignedJobID != nil { + jobID := *agent.AssignedJobID + log.Printf("[%s] Agent %s with job %s is being cleaned, marking job as failed", CleanerName, agent.ID, jobID) + + err := publisher.HandleJobFinished(ctx, jobID, "failed") + if err != nil { + log.Printf("[%s] Failed to publish job finalization for job %s: %s", CleanerName, jobID, err.Error()) + // Skip deleting this agent since we couldn't finalize the job + continue + } + } + + // Add to the list of IDs to delete + idsToDelete = append(idsToDelete, agent.ID) + } + + if len(idsToDelete) == 0 { + log.Printf("[%s] No agents to delete after processing", CleanerName) + return nil + } + // Delete only the agents we've processed successfully + log.Printf("[%s] Deleting %d agents", CleanerName, len(idsToDelete)) + err = db.Where("id IN (?)", idsToDelete).Delete(&models.Agent{}).Error if err != nil { - log.Printf("error while deleting stuck agents, err: %s", err.Error()) + log.Printf("[%s] Error while deleting agents: %s", CleanerName, err.Error()) } return err @@ -51,7 +107,7 @@ func initialDelay() { } interval, err := time.ParseDuration(delayInterval) if err != nil { - log.Printf("error while parsing initial delay interval '%s': %v", delayInterval, err) + log.Printf("[%s] Error while parsing initial delay interval '%s': %v", CleanerName, delayInterval, err) return } time.Sleep(interval) diff --git a/self_hosted_hub/pkg/workers/agentcleaner/worker_test.go b/self_hosted_hub/pkg/workers/agentcleaner/worker_test.go index 75f3311b2..a7728f9fb 100644 --- a/self_hosted_hub/pkg/workers/agentcleaner/worker_test.go +++ b/self_hosted_hub/pkg/workers/agentcleaner/worker_test.go @@ -5,6 +5,7 @@ import ( "testing" "time" + "github.com/semaphoreio/semaphore/self_hosted_hub/pkg/amqp" database "github.com/semaphoreio/semaphore/self_hosted_hub/pkg/database" models "github.com/semaphoreio/semaphore/self_hosted_hub/pkg/models" "github.com/stretchr/testify/require" @@ -12,6 +13,7 @@ import ( var orgID = database.UUID() var requesterID = database.UUID() +var publisher, _ = amqp.NewPublisher("amqp://guest:guest@rabbitmq:5672") func Test__DeletingStuckAgents(t *testing.T) { database.TruncateTables() @@ -20,7 +22,7 @@ func Test__DeletingStuckAgents(t *testing.T) { t.Run("it doesn't delete agents that didn't sync at all up to 1m after registring", func(t *testing.T) { agent := createAgent(t, at.Name) - Tick() + Tick(publisher) assertAgentExists(t, agent.ID.String()) }) @@ -30,7 +32,7 @@ func Test__DeletingStuckAgents(t *testing.T) { agent.CreatedAt = &when updateAgent(t, agent) - Tick() + Tick(publisher) assertAgentDoesntExists(t, agent.ID.String()) }) @@ -42,7 +44,7 @@ func Test__DeletingStuckAgents(t *testing.T) { agent.LastSyncAt = &when updateAgent(t, agent) - Tick() + Tick(publisher) assertAgentExists(t, agent.ID.String()) }) @@ -53,7 +55,7 @@ func Test__DeletingStuckAgents(t *testing.T) { agent.LastSyncAt = &when updateAgent(t, agent) - Tick() + Tick(publisher) assertAgentDoesntExists(t, agent.ID.String()) }) @@ -67,7 +69,7 @@ func Test__DeletingStuckAgents(t *testing.T) { agent.AssignedJobID = &job updateAgent(t, agent) - Tick() + Tick(publisher) assertAgentExists(t, agent.ID.String()) }) @@ -81,7 +83,7 @@ func Test__DeletingStuckAgents(t *testing.T) { agent.AssignedJobID = &job updateAgent(t, agent) - Tick() + Tick(publisher) assertAgentDoesntExists(t, agent.ID.String()) }) diff --git a/self_hosted_hub/pkg/workers/disconnectedcleaner/worker.go b/self_hosted_hub/pkg/workers/disconnectedcleaner/worker.go index 4b3badd3b..fc22a2461 100644 --- a/self_hosted_hub/pkg/workers/disconnectedcleaner/worker.go +++ b/self_hosted_hub/pkg/workers/disconnectedcleaner/worker.go @@ -1,8 +1,10 @@ package disconnectedcleaner import ( + "context" "time" + "github.com/semaphoreio/semaphore/self_hosted_hub/pkg/amqp" database "github.com/semaphoreio/semaphore/self_hosted_hub/pkg/database" "github.com/semaphoreio/semaphore/self_hosted_hub/pkg/models" log "github.com/sirupsen/logrus" @@ -11,23 +13,30 @@ import ( const DisconnectedCleanerName = "self-hosted-disconnected-agents-cleaner" -func Start() { +func Start(publisher *amqp.Publisher) { for { - Tick() + Tick(publisher) time.Sleep(1 * time.Minute) } } -func Tick() { +func Tick(publisher *amqp.Publisher) { // The advisory lock makes sure that only one cleaner is working at a time - _ = database.WithAdvisoryLock(DisconnectedCleanerName, deleteDisconnectedAgents) + _ = database.WithAdvisoryLock(DisconnectedCleanerName, func(db *gorm.DB) error { + return deleteDisconnectedAgents(db, publisher) + }) } -func deleteDisconnectedAgents(db *gorm.DB) error { - var ids []string +func deleteDisconnectedAgents(db *gorm.DB, publisher *amqp.Publisher) error { + // Find all agents that should be cleaned up in a single query + type AgentInfo struct { + ID string `gorm:"column:id"` + AssignedJobID *string `gorm:"column:assigned_job_id"` + } + var agents []AgentInfo err := db.Raw(` - SELECT id FROM agents AS a + SELECT a.id, a.assigned_job_id::text AS assigned_job_id FROM agents AS a LEFT JOIN agent_types AS at ON at.organization_id = a.organization_id AND at.name = a.agent_type_name @@ -35,28 +44,60 @@ func deleteDisconnectedAgents(db *gorm.DB) error { AND EXTRACT(EPOCH FROM NOW()) > EXTRACT(EPOCH FROM a.disconnected_at) + at.release_name_after LIMIT 100 `, models.AgentStateDisconnected, - ).Scan(&ids).Error + ).Scan(&agents).Error if err != nil { - log.Errorf("Error querying disconnected agents: %v", err) + log.Errorf("[%s] Error querying disconnected agents for cleanup: %v", DisconnectedCleanerName, err) return err } - if len(ids) == 0 { - log.Infof("No agents to delete.") + if len(agents) == 0 { + log.Infof("[%s] No agents to delete.", DisconnectedCleanerName) + return nil + } + + log.Infof("[%s] Found %d disconnected agents to clean up", DisconnectedCleanerName, len(agents)) + + // Process agents and collect IDs to delete + var idsToDelete []string + ctx := context.Background() + + for _, agent := range agents { + // For agents with assigned jobs, handle job finalization first + if agent.AssignedJobID != nil && *agent.AssignedJobID != "" { + jobID := *agent.AssignedJobID + log.Infof("[%s] Disconnected agent %s with job %s is being cleaned, marking job as failed", + DisconnectedCleanerName, agent.ID, jobID) + + err := publisher.HandleJobFinished(ctx, jobID, "failed") + if err != nil { + log.Errorf("[%s] Failed to publish job finalization for job %s: %s", + DisconnectedCleanerName, jobID, err.Error()) + // Skip deleting this agent since we couldn't finalize the job + continue + } + } + + // Add to the list of IDs to delete + idsToDelete = append(idsToDelete, agent.ID) + } + + if len(idsToDelete) == 0 { + log.Infof("[%s] No agents to delete after processing", DisconnectedCleanerName) return nil } - log.Infof("Deleting agents: %v", ids) - dbExec := db.Exec(`DELETE FROM agents WHERE id in ?`, ids) + // Delete only the agents we've processed successfully + log.Infof("[%s] Deleting %d disconnected agents", DisconnectedCleanerName, len(idsToDelete)) + dbExec := db.Exec(`DELETE FROM agents WHERE id IN (?)`, idsToDelete) if dbExec.Error != nil { - log.Errorf("Error deleting disconnected agents: %v", err) - return err + log.Errorf("[%s] Error deleting disconnected agents: %v", DisconnectedCleanerName, dbExec.Error) + return dbExec.Error } - if dbExec.RowsAffected != int64(len(ids)) { - log.Errorf("More agents were deleted than expected: %v", err) - return err + if dbExec.RowsAffected != int64(len(idsToDelete)) { + log.Errorf("[%s] Fewer agents were deleted than expected: expected %d, got %d", + DisconnectedCleanerName, len(idsToDelete), dbExec.RowsAffected) } return nil diff --git a/self_hosted_hub/pkg/workers/disconnectedcleaner/worker_test.go b/self_hosted_hub/pkg/workers/disconnectedcleaner/worker_test.go index 6641d0a2f..2ee9d673a 100644 --- a/self_hosted_hub/pkg/workers/disconnectedcleaner/worker_test.go +++ b/self_hosted_hub/pkg/workers/disconnectedcleaner/worker_test.go @@ -6,6 +6,7 @@ import ( "testing" "time" + "github.com/semaphoreio/semaphore/self_hosted_hub/pkg/amqp" database "github.com/semaphoreio/semaphore/self_hosted_hub/pkg/database" models "github.com/semaphoreio/semaphore/self_hosted_hub/pkg/models" "github.com/stretchr/testify/require" @@ -13,6 +14,7 @@ import ( var orgID = database.UUID() var requesterID = database.UUID() +var publisher, _ = amqp.NewPublisher("amqp://guest:guest@rabbitmq:5672") func Test__DeletingDisconnectedAgents(t *testing.T) { database.TruncateTables() @@ -25,7 +27,7 @@ func Test__DeletingDisconnectedAgents(t *testing.T) { t.Run("it doesn't delete agents that didn't disconnect", func(t *testing.T) { agent := createAgent(t, at.Name) - Tick() + Tick(publisher) assertAgentExists(t, agent.ID.String()) }) @@ -33,7 +35,7 @@ func Test__DeletingDisconnectedAgents(t *testing.T) { agent1 := createAgent(t, at.Name) agent2 := createAgent(t, at.Name) agent3 := createAgent(t, at.Name) - Tick() + Tick(publisher) // agent 2 and 3 disconnects agent2.Disconnect() @@ -45,7 +47,7 @@ func Test__DeletingDisconnectedAgents(t *testing.T) { // force 2 minutes to pass for agent 3 twoMinsAgo := time.Now().Add(-2 * time.Minute) require.NoError(t, database.Conn().Model(&agent3).Update("disconnected_at", &twoMinsAgo).Error) - Tick() + Tick(publisher) assertAgentExists(t, agent1.ID.String()) assertAgentExists(t, agent2.ID.String()) From ec7550abb506be4badd521a926cf7e0f78af01aa Mon Sep 17 00:00:00 2001 From: Dejan K Date: Wed, 17 Sep 2025 16:53:28 +0200 Subject: [PATCH 2/2] revert(self-hosted-hub): revert changes for disconnected cleaner --- self_hosted_hub/cmd/server/main.go | 16 ++-- .../pkg/workers/disconnectedcleaner/worker.go | 77 +++++-------------- .../disconnectedcleaner/worker_test.go | 8 +- 3 files changed, 27 insertions(+), 74 deletions(-) diff --git a/self_hosted_hub/cmd/server/main.go b/self_hosted_hub/cmd/server/main.go index 4373529bb..cdf42bb47 100644 --- a/self_hosted_hub/cmd/server/main.go +++ b/self_hosted_hub/cmd/server/main.go @@ -75,20 +75,15 @@ func startInternalAPI() { internalapi.RunServer(50051, quotaClient) } -func startAgentCleaners() { - publisher := createPublisher() - go startAgentCleaner(publisher) - go startDisconnectedAgentCleaner(publisher) -} - -func startAgentCleaner(publisher *amqp.Publisher) { +func startAgentCleaner() { log.Println("Starting Agent Cleaner") + publisher := createPublisher() agentcleaner.Start(publisher) } -func startDisconnectedAgentCleaner(publisher *amqp.Publisher) { +func startDisconnectedAgentCleaner() { log.Println("Starting Disconnected Agent Cleaner") - disconnected_cleaner.Start(publisher) + disconnected_cleaner.Start() } func startMetricsCollector() { @@ -174,7 +169,8 @@ func main() { } if os.Getenv("START_AGENT_CLEANER") == "yes" { - startAgentCleaners() + go startAgentCleaner() + go startDisconnectedAgentCleaner() } if os.Getenv("START_METRICS_COLLECTOR") == "yes" { diff --git a/self_hosted_hub/pkg/workers/disconnectedcleaner/worker.go b/self_hosted_hub/pkg/workers/disconnectedcleaner/worker.go index fc22a2461..4b3badd3b 100644 --- a/self_hosted_hub/pkg/workers/disconnectedcleaner/worker.go +++ b/self_hosted_hub/pkg/workers/disconnectedcleaner/worker.go @@ -1,10 +1,8 @@ package disconnectedcleaner import ( - "context" "time" - "github.com/semaphoreio/semaphore/self_hosted_hub/pkg/amqp" database "github.com/semaphoreio/semaphore/self_hosted_hub/pkg/database" "github.com/semaphoreio/semaphore/self_hosted_hub/pkg/models" log "github.com/sirupsen/logrus" @@ -13,30 +11,23 @@ import ( const DisconnectedCleanerName = "self-hosted-disconnected-agents-cleaner" -func Start(publisher *amqp.Publisher) { +func Start() { for { - Tick(publisher) + Tick() time.Sleep(1 * time.Minute) } } -func Tick(publisher *amqp.Publisher) { +func Tick() { // The advisory lock makes sure that only one cleaner is working at a time - _ = database.WithAdvisoryLock(DisconnectedCleanerName, func(db *gorm.DB) error { - return deleteDisconnectedAgents(db, publisher) - }) + _ = database.WithAdvisoryLock(DisconnectedCleanerName, deleteDisconnectedAgents) } -func deleteDisconnectedAgents(db *gorm.DB, publisher *amqp.Publisher) error { - // Find all agents that should be cleaned up in a single query - type AgentInfo struct { - ID string `gorm:"column:id"` - AssignedJobID *string `gorm:"column:assigned_job_id"` - } +func deleteDisconnectedAgents(db *gorm.DB) error { + var ids []string - var agents []AgentInfo err := db.Raw(` - SELECT a.id, a.assigned_job_id::text AS assigned_job_id FROM agents AS a + SELECT id FROM agents AS a LEFT JOIN agent_types AS at ON at.organization_id = a.organization_id AND at.name = a.agent_type_name @@ -44,60 +35,28 @@ func deleteDisconnectedAgents(db *gorm.DB, publisher *amqp.Publisher) error { AND EXTRACT(EPOCH FROM NOW()) > EXTRACT(EPOCH FROM a.disconnected_at) + at.release_name_after LIMIT 100 `, models.AgentStateDisconnected, - ).Scan(&agents).Error + ).Scan(&ids).Error if err != nil { - log.Errorf("[%s] Error querying disconnected agents for cleanup: %v", DisconnectedCleanerName, err) + log.Errorf("Error querying disconnected agents: %v", err) return err } - if len(agents) == 0 { - log.Infof("[%s] No agents to delete.", DisconnectedCleanerName) - return nil - } - - log.Infof("[%s] Found %d disconnected agents to clean up", DisconnectedCleanerName, len(agents)) - - // Process agents and collect IDs to delete - var idsToDelete []string - ctx := context.Background() - - for _, agent := range agents { - // For agents with assigned jobs, handle job finalization first - if agent.AssignedJobID != nil && *agent.AssignedJobID != "" { - jobID := *agent.AssignedJobID - log.Infof("[%s] Disconnected agent %s with job %s is being cleaned, marking job as failed", - DisconnectedCleanerName, agent.ID, jobID) - - err := publisher.HandleJobFinished(ctx, jobID, "failed") - if err != nil { - log.Errorf("[%s] Failed to publish job finalization for job %s: %s", - DisconnectedCleanerName, jobID, err.Error()) - // Skip deleting this agent since we couldn't finalize the job - continue - } - } - - // Add to the list of IDs to delete - idsToDelete = append(idsToDelete, agent.ID) - } - - if len(idsToDelete) == 0 { - log.Infof("[%s] No agents to delete after processing", DisconnectedCleanerName) + if len(ids) == 0 { + log.Infof("No agents to delete.") return nil } - // Delete only the agents we've processed successfully - log.Infof("[%s] Deleting %d disconnected agents", DisconnectedCleanerName, len(idsToDelete)) - dbExec := db.Exec(`DELETE FROM agents WHERE id IN (?)`, idsToDelete) + log.Infof("Deleting agents: %v", ids) + dbExec := db.Exec(`DELETE FROM agents WHERE id in ?`, ids) if dbExec.Error != nil { - log.Errorf("[%s] Error deleting disconnected agents: %v", DisconnectedCleanerName, dbExec.Error) - return dbExec.Error + log.Errorf("Error deleting disconnected agents: %v", err) + return err } - if dbExec.RowsAffected != int64(len(idsToDelete)) { - log.Errorf("[%s] Fewer agents were deleted than expected: expected %d, got %d", - DisconnectedCleanerName, len(idsToDelete), dbExec.RowsAffected) + if dbExec.RowsAffected != int64(len(ids)) { + log.Errorf("More agents were deleted than expected: %v", err) + return err } return nil diff --git a/self_hosted_hub/pkg/workers/disconnectedcleaner/worker_test.go b/self_hosted_hub/pkg/workers/disconnectedcleaner/worker_test.go index 2ee9d673a..6641d0a2f 100644 --- a/self_hosted_hub/pkg/workers/disconnectedcleaner/worker_test.go +++ b/self_hosted_hub/pkg/workers/disconnectedcleaner/worker_test.go @@ -6,7 +6,6 @@ import ( "testing" "time" - "github.com/semaphoreio/semaphore/self_hosted_hub/pkg/amqp" database "github.com/semaphoreio/semaphore/self_hosted_hub/pkg/database" models "github.com/semaphoreio/semaphore/self_hosted_hub/pkg/models" "github.com/stretchr/testify/require" @@ -14,7 +13,6 @@ import ( var orgID = database.UUID() var requesterID = database.UUID() -var publisher, _ = amqp.NewPublisher("amqp://guest:guest@rabbitmq:5672") func Test__DeletingDisconnectedAgents(t *testing.T) { database.TruncateTables() @@ -27,7 +25,7 @@ func Test__DeletingDisconnectedAgents(t *testing.T) { t.Run("it doesn't delete agents that didn't disconnect", func(t *testing.T) { agent := createAgent(t, at.Name) - Tick(publisher) + Tick() assertAgentExists(t, agent.ID.String()) }) @@ -35,7 +33,7 @@ func Test__DeletingDisconnectedAgents(t *testing.T) { agent1 := createAgent(t, at.Name) agent2 := createAgent(t, at.Name) agent3 := createAgent(t, at.Name) - Tick(publisher) + Tick() // agent 2 and 3 disconnects agent2.Disconnect() @@ -47,7 +45,7 @@ func Test__DeletingDisconnectedAgents(t *testing.T) { // force 2 minutes to pass for agent 3 twoMinsAgo := time.Now().Add(-2 * time.Minute) require.NoError(t, database.Conn().Model(&agent3).Update("disconnected_at", &twoMinsAgo).Error) - Tick(publisher) + Tick() assertAgentExists(t, agent1.ID.String()) assertAgentExists(t, agent2.ID.String())