Skip to content

Commit

Permalink
gofmt
Browse files Browse the repository at this point in the history
  • Loading branch information
matejkramny committed Aug 19, 2017
1 parent 27d2e37 commit 56f22f9
Show file tree
Hide file tree
Showing 3 changed files with 29 additions and 29 deletions.
51 changes: 27 additions & 24 deletions api/tasks/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,58 +12,61 @@ type taskPool struct {
register chan *task
activeProj map[int]*task
activeNodes map[string]*task
running int
running int
}

var pool = taskPool{
queue: make([]*task, 0),
register: make(chan *task),
activeProj: make(map[int]*task),
activeNodes: make(map[string]*task),
running: 0,
running: 0,
}

type resourceLock struct {
lock bool
holder *task
lock bool
holder *task
}

var resourceLocker = make(chan *resourceLock)

func (p *taskPool) run() {

defer func() {
close(resourceLocker)
}()

ticker := time.NewTicker(5 * time.Second)

defer func() {
close(resourceLocker)
ticker.Stop()
}()

// Lock or unlock resources when running a task
go func (locker <-chan *resourceLock) {
go func(locker <-chan *resourceLock) {
for l := range locker {
t := l.holder

if l.lock {
if p.blocks(t) {
panic("Trying to lock an already locked resource!")
}

p.activeProj[t.projectID] = t

for _, node := range t.hosts {
p.activeNodes[node] = t
}
p.running += 1
} else {
if p.activeProj[t.projectID] == t {
delete(p.activeProj, t.projectID)
}
for _, node := range t.hosts {
delete(p.activeNodes, node)
}
p.running -= 1

p.running++
continue
}

if p.activeProj[t.projectID] == t {
delete(p.activeProj, t.projectID)
}

for _, node := range t.hosts {
delete(p.activeNodes, node)
}

p.running--
}
}(resourceLocker)

Expand All @@ -83,7 +86,7 @@ func (p *taskPool) run() {

if t := pool.queue[0]; t.task.Status != "error" {
fmt.Println("Running a task.")
resourceLocker <- &resourceLock{lock: true, holder: t,}
resourceLocker <- &resourceLock{lock: true, holder: t}
go t.run()
}
pool.queue = pool.queue[1:]
Expand All @@ -95,18 +98,18 @@ func (p *taskPool) blocks(t *task) bool {
if p.running >= util.Config.MaxParallelTasks {
return true
}

switch util.Config.ConcurrencyMode {
case "project":
return p.activeProj[t.projectID] != nil
case "node":
collision := false
for _, node := range t.hosts {
if p.activeNodes[node] != nil {
collision = true
break
return true
}
}
return collision

return false
default:
return p.running > 0
}
Expand Down
6 changes: 2 additions & 4 deletions api/tasks/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,10 @@ import (
"io/ioutil"
"os"
"os/exec"
"regexp"
"strconv"
"strings"
"time"
"regexp"

"github.com/ansible-semaphore/semaphore/db"
"github.com/ansible-semaphore/semaphore/util"
Expand Down Expand Up @@ -110,10 +110,9 @@ func (t *task) prepareRun() {
}

func (t *task) run() {

defer func() {
fmt.Println("Stopped running tasks")
resourceLocker <- &resourceLock{lock: false, holder: t,}
resourceLocker <- &resourceLock{lock: false, holder: t}

now := time.Now()
t.task.End = &now
Expand All @@ -133,7 +132,6 @@ func (t *task) run() {
}()

{
fmt.Println(t.users)
now := time.Now()
t.task.Status = "running"
t.task.Start = &now
Expand Down
1 change: 0 additions & 1 deletion util/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -350,7 +350,6 @@ func (conf *configType) Scan() {
if len(conf.LdapMappings.Mail) == 0 {
conf.LdapMappings.Mail = "mail"
}

} else {
conf.LdapEnable = false
}
Expand Down

0 comments on commit 56f22f9

Please sign in to comment.