Skip to content

Commit

Permalink
Merge branch 'TeliaSweden-develop' into develop
Browse files Browse the repository at this point in the history
  • Loading branch information
matejkramny committed Aug 19, 2017
2 parents 630d967 + 56f22f9 commit b9c2dbe
Show file tree
Hide file tree
Showing 3 changed files with 194 additions and 45 deletions.
99 changes: 84 additions & 15 deletions api/tasks/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,49 +3,118 @@ package tasks
import (
"fmt"
"time"

"github.com/ansible-semaphore/semaphore/util"
)

type taskPool struct {
queue []*task
register chan *task
running *task
queue []*task
register chan *task
activeProj map[int]*task
activeNodes map[string]*task
running int
}

var pool = taskPool{
queue: make([]*task, 0),
register: make(chan *task),
running: nil,
queue: make([]*task, 0),
register: make(chan *task),
activeProj: make(map[int]*task),
activeNodes: make(map[string]*task),
running: 0,
}

type resourceLock struct {
lock bool
holder *task
}

var resourceLocker = make(chan *resourceLock)

func (p *taskPool) run() {
ticker := time.NewTicker(10 * time.Second)
ticker := time.NewTicker(5 * time.Second)

defer func() {
close(resourceLocker)
ticker.Stop()
}()

// Lock or unlock resources when running a task
go func(locker <-chan *resourceLock) {
for l := range locker {
t := l.holder

if l.lock {
if p.blocks(t) {
panic("Trying to lock an already locked resource!")
}

p.activeProj[t.projectID] = t

for _, node := range t.hosts {
p.activeNodes[node] = t
}

p.running++
continue
}

if p.activeProj[t.projectID] == t {
delete(p.activeProj, t.projectID)
}

for _, node := range t.hosts {
delete(p.activeNodes, node)
}

p.running--
}
}(resourceLocker)

for {
select {
case task := <-p.register:
fmt.Println(task)
if p.running == nil {
go task.run()
continue
}

go task.prepareRun()
p.queue = append(p.queue, task)
case <-ticker.C:
if len(p.queue) == 0 || p.running != nil {
if len(p.queue) == 0 {
continue
} else if t := p.queue[0]; t.task.Status != "error" && (!t.prepared || p.blocks(t)) {
p.queue = append(p.queue[1:], t)
continue
}

fmt.Println("Running a task.")
go pool.queue[0].run()
if t := pool.queue[0]; t.task.Status != "error" {
fmt.Println("Running a task.")
resourceLocker <- &resourceLock{lock: true, holder: t}
go t.run()
}
pool.queue = pool.queue[1:]
}
}
}

func (p *taskPool) blocks(t *task) bool {
if p.running >= util.Config.MaxParallelTasks {
return true
}

switch util.Config.ConcurrencyMode {
case "project":
return p.activeProj[t.projectID] != nil
case "node":
for _, node := range t.hosts {
if p.activeNodes[node] != nil {
return true
}
}

return false
default:
return p.running > 0
}
}

func StartRunner() {
pool.run()
}
131 changes: 102 additions & 29 deletions api/tasks/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"io/ioutil"
"os"
"os/exec"
"regexp"
"strconv"
"strings"
"time"
Expand All @@ -26,6 +27,8 @@ type task struct {
users []int
projectID int
alert bool
hosts []string
prepared bool
alert_chat string
}

Expand All @@ -36,16 +39,11 @@ func (t *task) fail() {
t.sendTelegramAlert()
}

func (t *task) run() {
pool.running = t
func (t *task) prepareRun() {
t.prepared = false

defer func() {
fmt.Println("Stopped running tasks")
pool.running = nil

now := time.Now()
t.task.End = &now
t.updateStatus()
fmt.Println("Stopped preparing task")

objType := "task"
desc := "Task ID " + strconv.Itoa(t.task.ID) + " (" + t.template.Alias + ")" + " finished - " + strings.ToUpper(t.task.Status)
Expand All @@ -60,23 +58,16 @@ func (t *task) run() {
}
}()

t.log("Preparing: " + strconv.Itoa(t.task.ID))

if err := t.populateDetails(); err != nil {
t.log("Error: " + err.Error())
t.fail()
return
}

{
fmt.Println(t.users)
now := time.Now()
t.task.Status = "running"
t.task.Start = &now

t.updateStatus()
}

objType := "task"
desc := "Task ID " + strconv.Itoa(t.task.ID) + " (" + t.template.Alias + ")" + " is running"
desc := "Task ID " + strconv.Itoa(t.task.ID) + " (" + t.template.Alias + ")" + " is preparing"
if err := (db.Event{
ProjectID: &t.projectID,
ObjectType: &objType,
Expand All @@ -87,8 +78,7 @@ func (t *task) run() {
panic(err)
}

t.log("Started: " + strconv.Itoa(t.task.ID))
t.log("Run task with template: " + t.template.Alias + "\n")
t.log("Prepare task with template: " + t.template.Alias + "\n")

if err := t.installKey(t.repository.SshKey); err != nil {
t.log("Failed installing ssh key for repository access: " + err.Error())
Expand All @@ -110,6 +100,60 @@ func (t *task) run() {

// todo: write environment

if err := t.listPlaybookHosts(); err != nil {
t.log("Listing playbook hosts failed: " + err.Error())
t.fail()
return
}

t.prepared = true
}

func (t *task) run() {
defer func() {
fmt.Println("Stopped running tasks")
resourceLocker <- &resourceLock{lock: false, holder: t}

now := time.Now()
t.task.End = &now
t.updateStatus()

objType := "task"
desc := "Task ID " + strconv.Itoa(t.task.ID) + " (" + t.template.Alias + ")" + " finished - " + strings.ToUpper(t.task.Status)
if err := (db.Event{
ProjectID: &t.projectID,
ObjectType: &objType,
ObjectID: &t.task.ID,
Description: &desc,
}.Insert()); err != nil {
t.log("Fatal error inserting an event")
panic(err)
}
}()

{
now := time.Now()
t.task.Status = "running"
t.task.Start = &now

t.updateStatus()
}

objType := "task"
desc := "Task ID " + strconv.Itoa(t.task.ID) + " (" + t.template.Alias + ")" + " is running"
if err := (db.Event{
ProjectID: &t.projectID,
ObjectType: &objType,
ObjectID: &t.task.ID,
Description: &desc,
}.Insert()); err != nil {
t.log("Fatal error inserting an event")
panic(err)
}

t.log("Started: " + strconv.Itoa(t.task.ID))
t.log("Run task with template: " + t.template.Alias + "\n")

if err := t.runGalaxy(); err != nil {
t.log("Running galaxy failed: " + err.Error())
t.fail()
Expand Down Expand Up @@ -296,7 +340,42 @@ func (t *task) runGalaxy() error {
return cmd.Run()
}

func (t *task) listPlaybookHosts() error {
args, err := t.getPlaybookArgs()
if err != nil {
return err
}
args = append(args, "--list-hosts")

cmd := exec.Command("ansible-playbook", args...)
cmd.Dir = util.Config.TmpPath + "/repository_" + strconv.Itoa(t.repository.ID)
cmd.Env = t.envVars(util.Config.TmpPath, cmd.Dir, nil)

out, err := cmd.Output()
re := regexp.MustCompile("(?m)^\\s{6}(.*)$")
matches := re.FindAllSubmatch(out, 20)
hosts := make([]string, len(matches))
for i, _ := range matches {
hosts[i] = string(matches[i][1])
}
t.hosts = hosts
return err
}

func (t *task) runPlaybook() error {
args, err := t.getPlaybookArgs()
if err != nil {
return err
}
cmd := exec.Command("ansible-playbook", args...)
cmd.Dir = util.Config.TmpPath + "/repository_" + strconv.Itoa(t.repository.ID)
cmd.Env = t.envVars(util.Config.TmpPath, cmd.Dir, nil)

t.logCmd(cmd)
return cmd.Run()
}

func (t *task) getPlaybookArgs() ([]string, error) {
playbookName := t.task.Playbook
if len(playbookName) == 0 {
playbookName = t.template.Playbook
Expand All @@ -323,7 +402,7 @@ func (t *task) runPlaybook() error {
err := json.Unmarshal([]byte(t.environment.JSON), &js)
if err != nil {
t.log("JSON is not valid")
return err
return nil, err
}

args = append(args, "--extra-vars", t.environment.JSON)
Expand All @@ -334,7 +413,7 @@ func (t *task) runPlaybook() error {
err := json.Unmarshal([]byte(*t.template.Arguments), &extraArgs)
if err != nil {
t.log("Could not unmarshal arguments to []string")
return err
return nil, err
}
}

Expand All @@ -344,13 +423,7 @@ func (t *task) runPlaybook() error {
args = append(args, extraArgs...)
args = append(args, playbookName)
}

cmd := exec.Command("ansible-playbook", args...)
cmd.Dir = util.Config.TmpPath + "/repository_" + strconv.Itoa(t.repository.ID)
cmd.Env = t.envVars(util.Config.TmpPath, cmd.Dir, nil)

t.logCmd(cmd)
return cmd.Run()
return args, nil
}

func (t *task) envVars(home string, pwd string, gitSSHCommand *string) []string {
Expand Down
9 changes: 8 additions & 1 deletion util/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,10 @@ type configType struct {
TelegramAlert bool `json:"telegram_alert"`
TelegramChat string `json:"telegram_chat"`
TelegramToken string `json:"telegram_token"`

// task concurrency
ConcurrencyMode string `json:"concurrency_mode"`
MaxParallelTasks int `json:"max_parallel_tasks"`
}

var Config *configType
Expand Down Expand Up @@ -151,6 +155,10 @@ func init() {
Config.TmpPath = "/tmp/semaphore"
}

if Config.MaxParallelTasks < 1 {
Config.MaxParallelTasks = 10
}

var encryption []byte
encryption = nil

Expand Down Expand Up @@ -342,7 +350,6 @@ func (conf *configType) Scan() {
if len(conf.LdapMappings.Mail) == 0 {
conf.LdapMappings.Mail = "mail"
}

} else {
conf.LdapEnable = false
}
Expand Down

0 comments on commit b9c2dbe

Please sign in to comment.