Skip to content

Commit

Permalink
Merge pull request #546 from UnitedTraders/545-prepare-collisions
Browse files Browse the repository at this point in the history
Remove collisions on PrepareRun stage
  • Loading branch information
twhiston authored Jun 8, 2018
2 parents bfe1a4f + b60213a commit 853fe5f
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 18 deletions.
36 changes: 26 additions & 10 deletions api/tasks/pool.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
package tasks

import (
"fmt"
"strconv"
"time"

log "github.com/Sirupsen/logrus"
"github.com/ansible-semaphore/semaphore/util"
)

Expand Down Expand Up @@ -74,23 +75,38 @@ func (p *taskPool) run() {
for {
select {
case task := <-p.register:
fmt.Println(task)
go task.prepareRun()
p.queue = append(p.queue, task)
log.Debug(task)
msg := "Task " + strconv.Itoa(task.task.ID) + " added to queue"
task.log(msg)
log.Info(msg)
case <-ticker.C:
if len(p.queue) == 0 {
continue
} else if t := p.queue[0]; t.task.Status != taskFailStatus && (!t.prepared || p.blocks(t)) {
}

//get task from top of queue
t := p.queue[0]
if t.task.Status == taskFailStatus {
//delete failed task from queue
p.queue = p.queue[1:]
log.Info("Task " + strconv.Itoa(t.task.ID) + " removed from queue")
continue
}
if p.blocks(t) {
//move blocked task to end of queue
p.queue = append(p.queue[1:], t)
continue
}

if t := pool.queue[0]; t.task.Status != taskFailStatus {
fmt.Println("Running a task.")
resourceLocker <- &resourceLock{lock: true, holder: t}
go t.run()
log.Info("Set resourse locker with task " + strconv.Itoa(t.task.ID))
resourceLocker <- &resourceLock{lock: true, holder: t}
if !t.prepared {
go t.prepareRun()
continue
}
pool.queue = pool.queue[1:]
go t.run()
p.queue = p.queue[1:]
log.Info("Task " + strconv.Itoa(t.task.ID) + " removed from queue")
}
}
}
Expand Down
24 changes: 16 additions & 8 deletions api/tasks/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,16 +14,16 @@ import (
"strings"
"time"

log "github.com/Sirupsen/logrus"
"github.com/ansible-semaphore/semaphore/db"
"github.com/ansible-semaphore/semaphore/util"
)

const (
taskFailStatus = "error"
taskTypeID = "task"
taskTypeID = "task"
)


type task struct {
task db.Task
template db.Template
Expand All @@ -50,7 +50,9 @@ func (t *task) prepareRun() {
t.prepared = false

defer func() {
fmt.Println("Stopped preparing task")
log.Info("Stopped preparing task " + strconv.Itoa(t.task.ID))
log.Info("Release resourse locker with task " + strconv.Itoa(t.task.ID))
resourceLocker <- &resourceLock{lock: false, holder: t}

objType := taskTypeID
desc := "Task ID " + strconv.Itoa(t.task.ID) + " (" + t.template.Alias + ")" + " finished - " + strings.ToUpper(t.task.Status)
Expand Down Expand Up @@ -130,7 +132,8 @@ func (t *task) prepareRun() {

func (t *task) run() {
defer func() {
fmt.Println("Stopped running tasks")
log.Info("Stopped running task " + strconv.Itoa(t.task.ID))
log.Info("Release resourse locker with task " + strconv.Itoa(t.task.ID))
resourceLocker <- &resourceLock{lock: false, holder: t}

now := time.Now()
Expand Down Expand Up @@ -299,7 +302,7 @@ func (t *task) updateRepository() error {
repoName := "repository_" + strconv.Itoa(t.repository.ID)
_, err := os.Stat(util.Config.TmpPath + "/" + repoName)

cmd := exec.Command("git")//nolint: gas
cmd := exec.Command("git") //nolint: gas
cmd.Dir = util.Config.TmpPath

gitSSHCommand := "ssh -o StrictHostKeyChecking=no -i " + t.repository.SSHKey.GetPath()
Expand Down Expand Up @@ -335,7 +338,7 @@ func (t *task) runGalaxy() error {
"--force",
}

cmd := exec.Command("ansible-galaxy", args...)//nolint: gas
cmd := exec.Command("ansible-galaxy", args...) //nolint: gas
cmd.Dir = util.Config.TmpPath + "/repository_" + strconv.Itoa(t.repository.ID)

gitSSHCommand := "ssh -o StrictHostKeyChecking=no -i " + t.repository.SSHKey.GetPath()
Expand All @@ -350,13 +353,18 @@ func (t *task) runGalaxy() error {
}

func (t *task) listPlaybookHosts() (string, error) {

if util.Config.ConcurrencyMode == "project" {
return "", nil
}

args, err := t.getPlaybookArgs()
if err != nil {
return "", err
}
args = append(args, "--list-hosts")

cmd := exec.Command("ansible-playbook", args...)//nolint: gas
cmd := exec.Command("ansible-playbook", args...) //nolint: gas
cmd.Dir = util.Config.TmpPath + "/repository_" + strconv.Itoa(t.repository.ID)
cmd.Env = t.envVars(util.Config.TmpPath, cmd.Dir, nil)

Expand All @@ -380,7 +388,7 @@ func (t *task) runPlaybook() error {
if err != nil {
return err
}
cmd := exec.Command("ansible-playbook", args...)//nolint: gas
cmd := exec.Command("ansible-playbook", args...) //nolint: gas
cmd.Dir = util.Config.TmpPath + "/repository_" + strconv.Itoa(t.repository.ID)
cmd.Env = t.envVars(util.Config.TmpPath, cmd.Dir, nil)

Expand Down

0 comments on commit 853fe5f

Please sign in to comment.