Skip to content

Commit

Permalink
remove collisions on prepareTask stage
Browse files Browse the repository at this point in the history
  • Loading branch information
strangeman committed Jun 7, 2018
1 parent 9da6231 commit 087acb8
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 13 deletions.
18 changes: 11 additions & 7 deletions api/tasks/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,22 +75,26 @@ func (p *taskPool) run() {
select {
case task := <-p.register:
fmt.Println(task)
go task.prepareRun()
p.queue = append(p.queue, task)
case <-ticker.C:
if len(p.queue) == 0 {
continue
} else if t := p.queue[0]; t.task.Status != taskFailStatus && (!t.prepared || p.blocks(t)) {
} else if t := p.queue[0]; t.task.Status != taskFailStatus && p.blocks(t) {
p.queue = append(p.queue[1:], t)
continue
}

if t := pool.queue[0]; t.task.Status != taskFailStatus {
fmt.Println("Running a task.")
resourceLocker <- &resourceLock{lock: true, holder: t}
go t.run()
if t := p.queue[0]; t.task.Status != taskFailStatus {
if t.prepared {
fmt.Println("Running a task.")
resourceLocker <- &resourceLock{lock: true, holder: t}
go t.run()
p.queue = p.queue[1:]
} else {
resourceLocker <- &resourceLock{lock: true, holder: t}
go t.prepareRun()
}
}
pool.queue = pool.queue[1:]
}
}
}
Expand Down
17 changes: 11 additions & 6 deletions api/tasks/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,9 @@ import (

const (
taskFailStatus = "error"
taskTypeID = "task"
taskTypeID = "task"
)


type task struct {
task db.Task
template db.Template
Expand Down Expand Up @@ -51,6 +50,7 @@ func (t *task) prepareRun() {

defer func() {
fmt.Println("Stopped preparing task")
resourceLocker <- &resourceLock{lock: false, holder: t}

This comment has been minimized.

Copy link
@Natanande

Natanande Sep 13, 2018

Contributor

Wont this line always release all hosts it needs when a task prepares, making it able to run no matter if another job blocks it or not?


objType := taskTypeID
desc := "Task ID " + strconv.Itoa(t.task.ID) + " (" + t.template.Alias + ")" + " finished - " + strings.ToUpper(t.task.Status)
Expand Down Expand Up @@ -299,7 +299,7 @@ func (t *task) updateRepository() error {
repoName := "repository_" + strconv.Itoa(t.repository.ID)
_, err := os.Stat(util.Config.TmpPath + "/" + repoName)

cmd := exec.Command("git")//nolint: gas
cmd := exec.Command("git") //nolint: gas
cmd.Dir = util.Config.TmpPath

gitSSHCommand := "ssh -o StrictHostKeyChecking=no -i " + t.repository.SSHKey.GetPath()
Expand Down Expand Up @@ -335,7 +335,7 @@ func (t *task) runGalaxy() error {
"--force",
}

cmd := exec.Command("ansible-galaxy", args...)//nolint: gas
cmd := exec.Command("ansible-galaxy", args...) //nolint: gas
cmd.Dir = util.Config.TmpPath + "/repository_" + strconv.Itoa(t.repository.ID)

gitSSHCommand := "ssh -o StrictHostKeyChecking=no -i " + t.repository.SSHKey.GetPath()
Expand All @@ -350,13 +350,18 @@ func (t *task) runGalaxy() error {
}

func (t *task) listPlaybookHosts() (string, error) {

if util.Config.ConcurrencyMode == "project" {
return "", nil
}

args, err := t.getPlaybookArgs()
if err != nil {
return "", err
}
args = append(args, "--list-hosts")

cmd := exec.Command("ansible-playbook", args...)//nolint: gas
cmd := exec.Command("ansible-playbook", args...) //nolint: gas
cmd.Dir = util.Config.TmpPath + "/repository_" + strconv.Itoa(t.repository.ID)
cmd.Env = t.envVars(util.Config.TmpPath, cmd.Dir, nil)

Expand All @@ -380,7 +385,7 @@ func (t *task) runPlaybook() error {
if err != nil {
return err
}
cmd := exec.Command("ansible-playbook", args...)//nolint: gas
cmd := exec.Command("ansible-playbook", args...) //nolint: gas
cmd.Dir = util.Config.TmpPath + "/repository_" + strconv.Itoa(t.repository.ID)
cmd.Env = t.envVars(util.Config.TmpPath, cmd.Dir, nil)

Expand Down

0 comments on commit 087acb8

Please sign in to comment.