Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 18 additions & 10 deletions self_hosted_hub/cmd/server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,6 @@ func startPublicAPI() {
panic("Public API port can't be empty")
}

rabbitURL := os.Getenv("RABBITMQ_URL")
if rabbitURL == "" {
panic("Public API needs RABBITMQ_URL")
}

provider, err := configureFeatureProvider()
if err != nil {
panic(err)
Expand All @@ -52,10 +47,7 @@ func startPublicAPI() {
log.Fatalf("Error creating agent counter: %v", err)
}

publisher, err := amqp.NewPublisher(rabbitURL)
if err != nil {
log.Fatalf("Error creating AMQP publisher: %v", err)
}
publisher := createPublisher()

server, err := publicapi.NewServer(quotaClient, agentCounter, publisher)
if err != nil {
Expand Down Expand Up @@ -85,7 +77,8 @@ func startInternalAPI() {

func startAgentCleaner() {
log.Println("Starting Agent Cleaner")
agentcleaner.Start()
publisher := createPublisher()
agentcleaner.Start(publisher)
}

func startDisconnectedAgentCleaner() {
Expand Down Expand Up @@ -188,3 +181,18 @@ func main() {

select {}
}

// Creates a publisher for AMQP
// Panics if RABBITMQ_URL is not set or if there is an error creating the publisher
func createPublisher() *amqp.Publisher {
rabbitURL := os.Getenv("RABBITMQ_URL")
if rabbitURL == "" {
panic("RABBITMQ_URL is required to run the service")
}

publisher, err := amqp.NewPublisher(rabbitURL)
if err != nil {
log.Fatalf("Error creating AMQP publisher: %v", err)
}
return publisher
}
7 changes: 1 addition & 6 deletions self_hosted_hub/pkg/agentsync/protocol.go
Original file line number Diff line number Diff line change
Expand Up @@ -257,12 +257,7 @@ func handleFinishedJobState(ctx context.Context, publisher *amqp.Publisher, agen
* it means the agent was using callbacks, so we don't send them again here.
*/
if result != "" {
err = publisher.PublishFinishedCallback(ctx, jobID, string(result))
if err != nil {
return nil, err
}

err = publisher.PublishTeardownFinishedCallback(ctx, jobID)
err = publisher.HandleJobFinished(ctx, jobID, string(result))
if err != nil {
return nil, err
}
Expand Down
18 changes: 18 additions & 0 deletions self_hosted_hub/pkg/amqp/publisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,24 @@ func (p *Publisher) PublishTeardownFinishedCallback(ctx context.Context, jobID s
})
}

/*
* This function is used to publish the job finished and teardown finished callbacks.
* It is used by the job-finished sync request and agent cleaners.
*/
func (p *Publisher) HandleJobFinished(ctx context.Context, jobID string, result string) error {
err := p.PublishFinishedCallback(ctx, jobID, result)
if err != nil {
return err
}

err = p.PublishTeardownFinishedCallback(ctx, jobID)
if err != nil {
return err
}

return nil
}

func buildJobFinishedMessage(jobId, result string) ([]byte, error) {

/*
Expand Down
80 changes: 68 additions & 12 deletions self_hosted_hub/pkg/workers/agentcleaner/worker.go
Original file line number Diff line number Diff line change
@@ -1,44 +1,100 @@
package agentcleaner

import (
"context"
"os"
"time"

log "github.com/sirupsen/logrus"

"github.com/semaphoreio/semaphore/self_hosted_hub/pkg/amqp"
database "github.com/semaphoreio/semaphore/self_hosted_hub/pkg/database"
models "github.com/semaphoreio/semaphore/self_hosted_hub/pkg/models"
"gorm.io/gorm"
)

const CleanerName = "self-hosted-agents-cleaner"
const batchSize = 100

func Start() {
func Start(publisher *amqp.Publisher) {
initialDelay()
for {
Tick()
Tick(publisher)
time.Sleep(1 * time.Minute)
}
}

func Tick() {
func Tick(publisher *amqp.Publisher) {
// The advisory lock makes sure that only one cleaner is working at a time
_ = database.WithAdvisoryLock(CleanerName, deleteStuckAgents)
_ = database.WithAdvisoryLock(CleanerName, func(db *gorm.DB) error {
return deleteStuckAgents(db, publisher)
})
}

func deleteStuckAgents(db *gorm.DB) error {
func deleteStuckAgents(db *gorm.DB, publisher *amqp.Publisher) error {
oneMinAgo := time.Now().Add(-1 * time.Minute)
threeMinsAgo := time.Now().Add(-3 * time.Minute)
fifteenMinsAgo := time.Now().Add(-15 * time.Minute)

err := db.Where("last_sync_at IS NULL AND created_at < ?", oneMinAgo).
Or(db.Where("last_sync_at < ? AND assigned_job_id IS NULL", threeMinsAgo)).
Or(db.Where("last_sync_at < ?", fifteenMinsAgo)).
Delete(models.Agent{}).
Error
// Find all agents that should be cleaned up in a single query
type AgentInfo struct {
ID string `gorm:"column:id"`
AssignedJobID *string `gorm:"column:assigned_job_id"`
}

var agents []AgentInfo
err := db.Model(&models.Agent{}).
Select("id, assigned_job_id::text as assigned_job_id").
Where("(last_sync_at IS NULL AND created_at < ?)", oneMinAgo).
Or("(last_sync_at < ? AND assigned_job_id IS NULL)", threeMinsAgo).
Or("(last_sync_at < ?)", fifteenMinsAgo).
Limit(batchSize).
Scan(&agents).Error

if err != nil {
log.Printf("[%s] Error while querying agents for cleanup: %s", CleanerName, err.Error())
return err
}

if len(agents) == 0 {
log.Printf("[%s] No agents to clean up", CleanerName)
return nil
}

log.Printf("[%s] Found %d agents to clean up", CleanerName, len(agents))

// Process agents and collect IDs to delete
var idsToDelete []string
ctx := context.Background()

for _, agent := range agents {
// For agents with assigned jobs, handle job finalization first
if agent.AssignedJobID != nil {
jobID := *agent.AssignedJobID
log.Printf("[%s] Agent %s with job %s is being cleaned, marking job as failed", CleanerName, agent.ID, jobID)

err := publisher.HandleJobFinished(ctx, jobID, "failed")
if err != nil {
log.Printf("[%s] Failed to publish job finalization for job %s: %s", CleanerName, jobID, err.Error())
// Skip deleting this agent since we couldn't finalize the job
continue
}
}

// Add to the list of IDs to delete
idsToDelete = append(idsToDelete, agent.ID)
}

if len(idsToDelete) == 0 {
log.Printf("[%s] No agents to delete after processing", CleanerName)
return nil
}

// Delete only the agents we've processed successfully
log.Printf("[%s] Deleting %d agents", CleanerName, len(idsToDelete))
err = db.Where("id IN (?)", idsToDelete).Delete(&models.Agent{}).Error
if err != nil {
log.Printf("error while deleting stuck agents, err: %s", err.Error())
log.Printf("[%s] Error while deleting agents: %s", CleanerName, err.Error())
}

return err
Expand All @@ -51,7 +107,7 @@ func initialDelay() {
}
interval, err := time.ParseDuration(delayInterval)
if err != nil {
log.Printf("error while parsing initial delay interval '%s': %v", delayInterval, err)
log.Printf("[%s] Error while parsing initial delay interval '%s': %v", CleanerName, delayInterval, err)
return
}
time.Sleep(interval)
Expand Down
14 changes: 8 additions & 6 deletions self_hosted_hub/pkg/workers/agentcleaner/worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,15 @@ import (
"testing"
"time"

"github.com/semaphoreio/semaphore/self_hosted_hub/pkg/amqp"
database "github.com/semaphoreio/semaphore/self_hosted_hub/pkg/database"
models "github.com/semaphoreio/semaphore/self_hosted_hub/pkg/models"
"github.com/stretchr/testify/require"
)

var orgID = database.UUID()
var requesterID = database.UUID()
var publisher, _ = amqp.NewPublisher("amqp://guest:guest@rabbitmq:5672")

func Test__DeletingStuckAgents(t *testing.T) {
database.TruncateTables()
Expand All @@ -20,7 +22,7 @@ func Test__DeletingStuckAgents(t *testing.T) {

t.Run("it doesn't delete agents that didn't sync at all up to 1m after registring", func(t *testing.T) {
agent := createAgent(t, at.Name)
Tick()
Tick(publisher)
assertAgentExists(t, agent.ID.String())
})

Expand All @@ -30,7 +32,7 @@ func Test__DeletingStuckAgents(t *testing.T) {
agent.CreatedAt = &when
updateAgent(t, agent)

Tick()
Tick(publisher)

assertAgentDoesntExists(t, agent.ID.String())
})
Expand All @@ -42,7 +44,7 @@ func Test__DeletingStuckAgents(t *testing.T) {
agent.LastSyncAt = &when
updateAgent(t, agent)

Tick()
Tick(publisher)

assertAgentExists(t, agent.ID.String())
})
Expand All @@ -53,7 +55,7 @@ func Test__DeletingStuckAgents(t *testing.T) {
agent.LastSyncAt = &when
updateAgent(t, agent)

Tick()
Tick(publisher)

assertAgentDoesntExists(t, agent.ID.String())
})
Expand All @@ -67,7 +69,7 @@ func Test__DeletingStuckAgents(t *testing.T) {
agent.AssignedJobID = &job
updateAgent(t, agent)

Tick()
Tick(publisher)

assertAgentExists(t, agent.ID.String())
})
Expand All @@ -81,7 +83,7 @@ func Test__DeletingStuckAgents(t *testing.T) {
agent.AssignedJobID = &job
updateAgent(t, agent)

Tick()
Tick(publisher)

assertAgentDoesntExists(t, agent.ID.String())
})
Expand Down