diff --git a/self_hosted_hub/cmd/server/main.go b/self_hosted_hub/cmd/server/main.go index f36870f2d..cdf42bb47 100644 --- a/self_hosted_hub/cmd/server/main.go +++ b/self_hosted_hub/cmd/server/main.go @@ -31,11 +31,6 @@ func startPublicAPI() { panic("Public API port can't be empty") } - rabbitURL := os.Getenv("RABBITMQ_URL") - if rabbitURL == "" { - panic("Public API needs RABBITMQ_URL") - } - provider, err := configureFeatureProvider() if err != nil { panic(err) @@ -52,10 +47,7 @@ func startPublicAPI() { log.Fatalf("Error creating agent counter: %v", err) } - publisher, err := amqp.NewPublisher(rabbitURL) - if err != nil { - log.Fatalf("Error creating AMQP publisher: %v", err) - } + publisher := createPublisher() server, err := publicapi.NewServer(quotaClient, agentCounter, publisher) if err != nil { @@ -85,7 +77,8 @@ func startInternalAPI() { func startAgentCleaner() { log.Println("Starting Agent Cleaner") - agentcleaner.Start() + publisher := createPublisher() + agentcleaner.Start(publisher) } func startDisconnectedAgentCleaner() { @@ -188,3 +181,18 @@ func main() { select {} } + +// Creates a publisher for AMQP +// Panics if RABBITMQ_URL is not set or if there is an error creating the publisher +func createPublisher() *amqp.Publisher { + rabbitURL := os.Getenv("RABBITMQ_URL") + if rabbitURL == "" { + panic("RABBITMQ_URL is required to run the service") + } + + publisher, err := amqp.NewPublisher(rabbitURL) + if err != nil { + log.Fatalf("Error creating AMQP publisher: %v", err) + } + return publisher +} diff --git a/self_hosted_hub/pkg/agentsync/protocol.go b/self_hosted_hub/pkg/agentsync/protocol.go index 73c5b617e..18e41152c 100644 --- a/self_hosted_hub/pkg/agentsync/protocol.go +++ b/self_hosted_hub/pkg/agentsync/protocol.go @@ -257,12 +257,7 @@ func handleFinishedJobState(ctx context.Context, publisher *amqp.Publisher, agen * it means the agent was using callbacks, so we don't send them again here. */ if result != "" { - err = publisher.PublishFinishedCallback(ctx, jobID, string(result)) - if err != nil { - return nil, err - } - - err = publisher.PublishTeardownFinishedCallback(ctx, jobID) + err = publisher.HandleJobFinished(ctx, jobID, string(result)) if err != nil { return nil, err } diff --git a/self_hosted_hub/pkg/amqp/publisher.go b/self_hosted_hub/pkg/amqp/publisher.go index e096b4401..45452c0cb 100644 --- a/self_hosted_hub/pkg/amqp/publisher.go +++ b/self_hosted_hub/pkg/amqp/publisher.go @@ -95,6 +95,24 @@ func (p *Publisher) PublishTeardownFinishedCallback(ctx context.Context, jobID s }) } +/* + * This function is used to publish the job finished and teardown finished callbacks. + * It is used by the job-finished sync request and agent cleaners. + */ +func (p *Publisher) HandleJobFinished(ctx context.Context, jobID string, result string) error { + err := p.PublishFinishedCallback(ctx, jobID, result) + if err != nil { + return err + } + + err = p.PublishTeardownFinishedCallback(ctx, jobID) + if err != nil { + return err + } + + return nil +} + func buildJobFinishedMessage(jobId, result string) ([]byte, error) { /* diff --git a/self_hosted_hub/pkg/workers/agentcleaner/worker.go b/self_hosted_hub/pkg/workers/agentcleaner/worker.go index a4da907db..27a42cb2e 100644 --- a/self_hosted_hub/pkg/workers/agentcleaner/worker.go +++ b/self_hosted_hub/pkg/workers/agentcleaner/worker.go @@ -1,44 +1,100 @@ package agentcleaner import ( + "context" "os" "time" log "github.com/sirupsen/logrus" + "github.com/semaphoreio/semaphore/self_hosted_hub/pkg/amqp" database "github.com/semaphoreio/semaphore/self_hosted_hub/pkg/database" models "github.com/semaphoreio/semaphore/self_hosted_hub/pkg/models" "gorm.io/gorm" ) const CleanerName = "self-hosted-agents-cleaner" +const batchSize = 100 -func Start() { +func Start(publisher *amqp.Publisher) { initialDelay() for { - Tick() + Tick(publisher) time.Sleep(1 * time.Minute) } } -func Tick() { +func Tick(publisher *amqp.Publisher) { // The advisory lock makes sure that only one cleaner is working at a time - _ = database.WithAdvisoryLock(CleanerName, deleteStuckAgents) + _ = database.WithAdvisoryLock(CleanerName, func(db *gorm.DB) error { + return deleteStuckAgents(db, publisher) + }) } -func deleteStuckAgents(db *gorm.DB) error { +func deleteStuckAgents(db *gorm.DB, publisher *amqp.Publisher) error { oneMinAgo := time.Now().Add(-1 * time.Minute) threeMinsAgo := time.Now().Add(-3 * time.Minute) fifteenMinsAgo := time.Now().Add(-15 * time.Minute) - err := db.Where("last_sync_at IS NULL AND created_at < ?", oneMinAgo). - Or(db.Where("last_sync_at < ? AND assigned_job_id IS NULL", threeMinsAgo)). - Or(db.Where("last_sync_at < ?", fifteenMinsAgo)). - Delete(models.Agent{}). - Error + // Find all agents that should be cleaned up in a single query + type AgentInfo struct { + ID string `gorm:"column:id"` + AssignedJobID *string `gorm:"column:assigned_job_id"` + } + + var agents []AgentInfo + err := db.Model(&models.Agent{}). + Select("id, assigned_job_id::text as assigned_job_id"). + Where("(last_sync_at IS NULL AND created_at < ?)", oneMinAgo). + Or("(last_sync_at < ? AND assigned_job_id IS NULL)", threeMinsAgo). + Or("(last_sync_at < ?)", fifteenMinsAgo). + Limit(batchSize). + Scan(&agents).Error + + if err != nil { + log.Printf("[%s] Error while querying agents for cleanup: %s", CleanerName, err.Error()) + return err + } + + if len(agents) == 0 { + log.Printf("[%s] No agents to clean up", CleanerName) + return nil + } + + log.Printf("[%s] Found %d agents to clean up", CleanerName, len(agents)) + + // Process agents and collect IDs to delete + var idsToDelete []string + ctx := context.Background() + + for _, agent := range agents { + // For agents with assigned jobs, handle job finalization first + if agent.AssignedJobID != nil { + jobID := *agent.AssignedJobID + log.Printf("[%s] Agent %s with job %s is being cleaned, marking job as failed", CleanerName, agent.ID, jobID) + + err := publisher.HandleJobFinished(ctx, jobID, "failed") + if err != nil { + log.Printf("[%s] Failed to publish job finalization for job %s: %s", CleanerName, jobID, err.Error()) + // Skip deleting this agent since we couldn't finalize the job + continue + } + } + + // Add to the list of IDs to delete + idsToDelete = append(idsToDelete, agent.ID) + } + + if len(idsToDelete) == 0 { + log.Printf("[%s] No agents to delete after processing", CleanerName) + return nil + } + // Delete only the agents we've processed successfully + log.Printf("[%s] Deleting %d agents", CleanerName, len(idsToDelete)) + err = db.Where("id IN (?)", idsToDelete).Delete(&models.Agent{}).Error if err != nil { - log.Printf("error while deleting stuck agents, err: %s", err.Error()) + log.Printf("[%s] Error while deleting agents: %s", CleanerName, err.Error()) } return err @@ -51,7 +107,7 @@ func initialDelay() { } interval, err := time.ParseDuration(delayInterval) if err != nil { - log.Printf("error while parsing initial delay interval '%s': %v", delayInterval, err) + log.Printf("[%s] Error while parsing initial delay interval '%s': %v", CleanerName, delayInterval, err) return } time.Sleep(interval) diff --git a/self_hosted_hub/pkg/workers/agentcleaner/worker_test.go b/self_hosted_hub/pkg/workers/agentcleaner/worker_test.go index 75f3311b2..a7728f9fb 100644 --- a/self_hosted_hub/pkg/workers/agentcleaner/worker_test.go +++ b/self_hosted_hub/pkg/workers/agentcleaner/worker_test.go @@ -5,6 +5,7 @@ import ( "testing" "time" + "github.com/semaphoreio/semaphore/self_hosted_hub/pkg/amqp" database "github.com/semaphoreio/semaphore/self_hosted_hub/pkg/database" models "github.com/semaphoreio/semaphore/self_hosted_hub/pkg/models" "github.com/stretchr/testify/require" @@ -12,6 +13,7 @@ import ( var orgID = database.UUID() var requesterID = database.UUID() +var publisher, _ = amqp.NewPublisher("amqp://guest:guest@rabbitmq:5672") func Test__DeletingStuckAgents(t *testing.T) { database.TruncateTables() @@ -20,7 +22,7 @@ func Test__DeletingStuckAgents(t *testing.T) { t.Run("it doesn't delete agents that didn't sync at all up to 1m after registring", func(t *testing.T) { agent := createAgent(t, at.Name) - Tick() + Tick(publisher) assertAgentExists(t, agent.ID.String()) }) @@ -30,7 +32,7 @@ func Test__DeletingStuckAgents(t *testing.T) { agent.CreatedAt = &when updateAgent(t, agent) - Tick() + Tick(publisher) assertAgentDoesntExists(t, agent.ID.String()) }) @@ -42,7 +44,7 @@ func Test__DeletingStuckAgents(t *testing.T) { agent.LastSyncAt = &when updateAgent(t, agent) - Tick() + Tick(publisher) assertAgentExists(t, agent.ID.String()) }) @@ -53,7 +55,7 @@ func Test__DeletingStuckAgents(t *testing.T) { agent.LastSyncAt = &when updateAgent(t, agent) - Tick() + Tick(publisher) assertAgentDoesntExists(t, agent.ID.String()) }) @@ -67,7 +69,7 @@ func Test__DeletingStuckAgents(t *testing.T) { agent.AssignedJobID = &job updateAgent(t, agent) - Tick() + Tick(publisher) assertAgentExists(t, agent.ID.String()) }) @@ -81,7 +83,7 @@ func Test__DeletingStuckAgents(t *testing.T) { agent.AssignedJobID = &job updateAgent(t, agent) - Tick() + Tick(publisher) assertAgentDoesntExists(t, agent.ID.String()) })