From 8bc1b7f2d285c44e8e5995c92c6070518c2ba8f4 Mon Sep 17 00:00:00 2001 From: Viktor Anderling Date: Mon, 29 May 2017 17:27:56 +0200 Subject: [PATCH 1/2] Allow concurrency for tasks that does not collide Two different concurrency modes are implemented, and is enabled by setting "concurrency_mode" in the config file to either "project" or "node". When "project" concurrency is enabled, tasks will run in parallel if and only if they do not share the same project id, with no regard to the nodes/hosts that are affected. When "node" concurrency is enabled, a task will run in parallel if and only if the hosts affected by tasks already running does not intersect with the hosts that would be affected by the task in question. If "concurrency_mode" is not specified, no task will start before the previous one has finished. The collision check is based on the output from the "--list-hosts" argument to ansible, which uses the hosts specified in the inventory. Thus, if two different hostnames are used that points to the same node, such as "127.0.0.1" and "localhost", there will be no collision and two tasks may connect to the same node concurrently. If this behaviour is not desired, one should make sure to not include aliases for their hosts in their inventories when enabling concurrency mode. To restrict the amount of parallel tasks that runs at the same time, one can add the "max_parallel_tasks" to the config file. This defaults to a humble 10 if not specified. --- api/tasks/pool.go | 96 +++++++++++++++++++++++++++----- api/tasks/runner.go | 133 ++++++++++++++++++++++++++++++++++---------- util/config.go | 8 +++ 3 files changed, 193 insertions(+), 44 deletions(-) diff --git a/api/tasks/pool.go b/api/tasks/pool.go index 19c38a8ae..0508843c9 100644 --- a/api/tasks/pool.go +++ b/api/tasks/pool.go @@ -3,49 +3,115 @@ package tasks import ( "fmt" "time" + + "github.com/ansible-semaphore/semaphore/util" ) type taskPool struct { - queue []*task - register chan *task - running *task + queue []*task + register chan *task + activeProj map[int]*task + activeNodes map[string]*task + running int } var pool = taskPool{ - queue: make([]*task, 0), - register: make(chan *task), - running: nil, + queue: make([]*task, 0), + register: make(chan *task), + activeProj: make(map[int]*task), + activeNodes: make(map[string]*task), + running: 0, +} + +type resourceLock struct { + lock bool + holder *task } +var resourceLocker = make(chan *resourceLock) + func (p *taskPool) run() { - ticker := time.NewTicker(10 * time.Second) + + defer func() { + close(resourceLocker) + }() + + ticker := time.NewTicker(5 * time.Second) defer func() { ticker.Stop() }() + // Lock or unlock resources when running a task + go func (locker <-chan *resourceLock) { + for l := range locker { + t := l.holder + if l.lock { + if p.blocks(t) { + panic("Trying to lock an already locked resource!") + } + p.activeProj[t.projectID] = t + for _, node := range t.hosts { + p.activeNodes[node] = t + } + p.running += 1 + } else { + if p.activeProj[t.projectID] == t { + delete(p.activeProj, t.projectID) + } + for _, node := range t.hosts { + delete(p.activeNodes, node) + } + p.running -= 1 + } + } + }(resourceLocker) + for { select { case task := <-p.register: fmt.Println(task) - if p.running == nil { - go task.run() - continue - } - + go task.prepareRun() p.queue = append(p.queue, task) case <-ticker.C: - if len(p.queue) == 0 || p.running != nil { + if len(p.queue) == 0 { + continue + } else if t := p.queue[0]; t.task.Status != "error" && (!t.prepared || p.blocks(t)) { + p.queue = append(p.queue[1:], t) continue } - fmt.Println("Running a task.") - go pool.queue[0].run() + if t := pool.queue[0]; t.task.Status != "error" { + fmt.Println("Running a task.") + resourceLocker <- &resourceLock{lock: true, holder: t,} + go t.run() + } pool.queue = pool.queue[1:] } } } +func (p *taskPool) blocks(t *task) bool { + if p.running >= util.Config.MaxParallelTasks { + return true + } + switch util.Config.ConcurrencyMode { + case "project": + return p.activeProj[t.projectID] != nil + case "node": + collision := false + for _, node := range t.hosts { + if p.activeNodes[node] != nil { + collision = true + break + } + } + return collision + default: + return p.running > 0 + } +} + func StartRunner() { pool.run() } diff --git a/api/tasks/runner.go b/api/tasks/runner.go index d5a3f462f..3d0364f2a 100644 --- a/api/tasks/runner.go +++ b/api/tasks/runner.go @@ -11,6 +11,7 @@ import ( "strconv" "strings" "time" + "regexp" "github.com/ansible-semaphore/semaphore/db" "github.com/ansible-semaphore/semaphore/util" @@ -26,6 +27,8 @@ type task struct { users []int projectID int alert bool + hosts []string + prepared bool alert_chat string } @@ -36,16 +39,11 @@ func (t *task) fail() { t.sendTelegramAlert() } -func (t *task) run() { - pool.running = t +func (t *task) prepareRun() { + t.prepared = false defer func() { - fmt.Println("Stopped running tasks") - pool.running = nil - - now := time.Now() - t.task.End = &now - t.updateStatus() + fmt.Println("Stopped preparing task") objType := "task" desc := "Task ID " + strconv.Itoa(t.task.ID) + " (" + t.template.Alias + ")" + " finished - " + strings.ToUpper(t.task.Status) @@ -60,23 +58,16 @@ func (t *task) run() { } }() + t.log("Preparing: " + strconv.Itoa(t.task.ID)) + if err := t.populateDetails(); err != nil { t.log("Error: " + err.Error()) t.fail() return } - { - fmt.Println(t.users) - now := time.Now() - t.task.Status = "running" - t.task.Start = &now - - t.updateStatus() - } - objType := "task" - desc := "Task ID " + strconv.Itoa(t.task.ID) + " (" + t.template.Alias + ")" + " is running" + desc := "Task ID " + strconv.Itoa(t.task.ID) + " (" + t.template.Alias + ")" + " is preparing" if err := (db.Event{ ProjectID: &t.projectID, ObjectType: &objType, @@ -87,8 +78,7 @@ func (t *task) run() { panic(err) } - t.log("Started: " + strconv.Itoa(t.task.ID)) - t.log("Run task with template: " + t.template.Alias + "\n") + t.log("Prepare task with template: " + t.template.Alias + "\n") if err := t.installKey(t.repository.SshKey); err != nil { t.log("Failed installing ssh key for repository access: " + err.Error()) @@ -110,6 +100,62 @@ func (t *task) run() { // todo: write environment + if err := t.listPlaybookHosts(); err != nil { + t.log("Listing playbook hosts failed: " + err.Error()) + t.fail() + return + } + + t.prepared = true +} + +func (t *task) run() { + + defer func() { + fmt.Println("Stopped running tasks") + resourceLocker <- &resourceLock{lock: false, holder: t,} + + now := time.Now() + t.task.End = &now + t.updateStatus() + + objType := "task" + desc := "Task ID " + strconv.Itoa(t.task.ID) + " (" + t.template.Alias + ")" + " finished - " + strings.ToUpper(t.task.Status) + if err := (db.Event{ + ProjectID: &t.projectID, + ObjectType: &objType, + ObjectID: &t.task.ID, + Description: &desc, + }.Insert()); err != nil { + t.log("Fatal error inserting an event") + panic(err) + } + }() + + { + fmt.Println(t.users) + now := time.Now() + t.task.Status = "running" + t.task.Start = &now + + t.updateStatus() + } + + objType := "task" + desc := "Task ID " + strconv.Itoa(t.task.ID) + " (" + t.template.Alias + ")" + " is running" + if err := (db.Event{ + ProjectID: &t.projectID, + ObjectType: &objType, + ObjectID: &t.task.ID, + Description: &desc, + }.Insert()); err != nil { + t.log("Fatal error inserting an event") + panic(err) + } + + t.log("Started: " + strconv.Itoa(t.task.ID)) + t.log("Run task with template: " + t.template.Alias + "\n") + if err := t.runGalaxy(); err != nil { t.log("Running galaxy failed: " + err.Error()) t.fail() @@ -296,7 +342,42 @@ func (t *task) runGalaxy() error { return cmd.Run() } +func (t *task) listPlaybookHosts() error { + args, err := t.getPlaybookArgs() + if err != nil { + return err + } + args = append(args, "--list-hosts") + + cmd := exec.Command("ansible-playbook", args...) + cmd.Dir = util.Config.TmpPath + "/repository_" + strconv.Itoa(t.repository.ID) + cmd.Env = t.envVars(util.Config.TmpPath, cmd.Dir, nil) + + out, err := cmd.Output() + re := regexp.MustCompile("(?m)^\\s{6}(.*)$") + matches := re.FindAllSubmatch(out, 20) + hosts := make([]string, len(matches)) + for i, _ := range matches { + hosts[i] = string(matches[i][1]) + } + t.hosts = hosts + return err +} + func (t *task) runPlaybook() error { + args, err := t.getPlaybookArgs() + if err != nil { + return err + } + cmd := exec.Command("ansible-playbook", args...) + cmd.Dir = util.Config.TmpPath + "/repository_" + strconv.Itoa(t.repository.ID) + cmd.Env = t.envVars(util.Config.TmpPath, cmd.Dir, nil) + + t.logCmd(cmd) + return cmd.Run() +} + +func (t *task) getPlaybookArgs() ([]string, error) { playbookName := t.task.Playbook if len(playbookName) == 0 { playbookName = t.template.Playbook @@ -323,7 +404,7 @@ func (t *task) runPlaybook() error { err := json.Unmarshal([]byte(t.environment.JSON), &js) if err != nil { t.log("JSON is not valid") - return err + return nil, err } args = append(args, "--extra-vars", t.environment.JSON) @@ -334,7 +415,7 @@ func (t *task) runPlaybook() error { err := json.Unmarshal([]byte(*t.template.Arguments), &extraArgs) if err != nil { t.log("Could not unmarshal arguments to []string") - return err + return nil, err } } @@ -344,13 +425,7 @@ func (t *task) runPlaybook() error { args = append(args, extraArgs...) args = append(args, playbookName) } - - cmd := exec.Command("ansible-playbook", args...) - cmd.Dir = util.Config.TmpPath + "/repository_" + strconv.Itoa(t.repository.ID) - cmd.Env = t.envVars(util.Config.TmpPath, cmd.Dir, nil) - - t.logCmd(cmd) - return cmd.Run() + return args, nil } func (t *task) envVars(home string, pwd string, gitSSHCommand *string) []string { diff --git a/util/config.go b/util/config.go index f3340fe85..7d2e09200 100644 --- a/util/config.go +++ b/util/config.go @@ -69,6 +69,10 @@ type configType struct { TelegramAlert bool `json:"telegram_alert"` TelegramChat string `json:"telegram_chat"` TelegramToken string `json:"telegram_token"` + + // task concurrency + ConcurrencyMode string `json:"concurrency_mode"` + MaxParallelTasks int `json:"max_parallel_tasks"` } var Config *configType @@ -151,6 +155,10 @@ func init() { Config.TmpPath = "/tmp/semaphore" } + if Config.MaxParallelTasks < 1 { + Config.MaxParallelTasks = 10 + } + var encryption []byte encryption = nil From 56f22f967338a70bd51c95b044f7affdca8c90c9 Mon Sep 17 00:00:00 2001 From: Matej Kramny Date: Sat, 19 Aug 2017 09:45:01 +0100 Subject: [PATCH 2/2] gofmt --- api/tasks/pool.go | 51 ++++++++++++++++++++++++--------------------- api/tasks/runner.go | 6 ++---- util/config.go | 1 - 3 files changed, 29 insertions(+), 29 deletions(-) diff --git a/api/tasks/pool.go b/api/tasks/pool.go index 0508843c9..60f6ea6eb 100644 --- a/api/tasks/pool.go +++ b/api/tasks/pool.go @@ -12,7 +12,7 @@ type taskPool struct { register chan *task activeProj map[int]*task activeNodes map[string]*task - running int + running int } var pool = taskPool{ @@ -20,50 +20,53 @@ var pool = taskPool{ register: make(chan *task), activeProj: make(map[int]*task), activeNodes: make(map[string]*task), - running: 0, + running: 0, } type resourceLock struct { - lock bool - holder *task + lock bool + holder *task } var resourceLocker = make(chan *resourceLock) func (p *taskPool) run() { - - defer func() { - close(resourceLocker) - }() - ticker := time.NewTicker(5 * time.Second) defer func() { + close(resourceLocker) ticker.Stop() }() // Lock or unlock resources when running a task - go func (locker <-chan *resourceLock) { + go func(locker <-chan *resourceLock) { for l := range locker { t := l.holder + if l.lock { if p.blocks(t) { panic("Trying to lock an already locked resource!") } + p.activeProj[t.projectID] = t + for _, node := range t.hosts { p.activeNodes[node] = t } - p.running += 1 - } else { - if p.activeProj[t.projectID] == t { - delete(p.activeProj, t.projectID) - } - for _, node := range t.hosts { - delete(p.activeNodes, node) - } - p.running -= 1 + + p.running++ + continue + } + + if p.activeProj[t.projectID] == t { + delete(p.activeProj, t.projectID) } + + for _, node := range t.hosts { + delete(p.activeNodes, node) + } + + p.running-- } }(resourceLocker) @@ -83,7 +86,7 @@ func (p *taskPool) run() { if t := pool.queue[0]; t.task.Status != "error" { fmt.Println("Running a task.") - resourceLocker <- &resourceLock{lock: true, holder: t,} + resourceLocker <- &resourceLock{lock: true, holder: t} go t.run() } pool.queue = pool.queue[1:] @@ -95,18 +98,18 @@ func (p *taskPool) blocks(t *task) bool { if p.running >= util.Config.MaxParallelTasks { return true } + switch util.Config.ConcurrencyMode { case "project": return p.activeProj[t.projectID] != nil case "node": - collision := false for _, node := range t.hosts { if p.activeNodes[node] != nil { - collision = true - break + return true } } - return collision + + return false default: return p.running > 0 } diff --git a/api/tasks/runner.go b/api/tasks/runner.go index 3d0364f2a..b6b409d34 100644 --- a/api/tasks/runner.go +++ b/api/tasks/runner.go @@ -8,10 +8,10 @@ import ( "io/ioutil" "os" "os/exec" + "regexp" "strconv" "strings" "time" - "regexp" "github.com/ansible-semaphore/semaphore/db" "github.com/ansible-semaphore/semaphore/util" @@ -110,10 +110,9 @@ func (t *task) prepareRun() { } func (t *task) run() { - defer func() { fmt.Println("Stopped running tasks") - resourceLocker <- &resourceLock{lock: false, holder: t,} + resourceLocker <- &resourceLock{lock: false, holder: t} now := time.Now() t.task.End = &now @@ -133,7 +132,6 @@ func (t *task) run() { }() { - fmt.Println(t.users) now := time.Now() t.task.Status = "running" t.task.Start = &now diff --git a/util/config.go b/util/config.go index 253ee2c6b..f92fdf256 100644 --- a/util/config.go +++ b/util/config.go @@ -350,7 +350,6 @@ func (conf *configType) Scan() { if len(conf.LdapMappings.Mail) == 0 { conf.LdapMappings.Mail = "mail" } - } else { conf.LdapEnable = false }