Skip to content

Commit

Permalink
feat(be): use chan for storing logs to database
Browse files Browse the repository at this point in the history
  • Loading branch information
fiftin committed Aug 30, 2021
1 parent 21c14d8 commit c1c8a9e
Show file tree
Hide file tree
Showing 3 changed files with 47 additions and 30 deletions.
15 changes: 5 additions & 10 deletions api/tasks/logging.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package tasks
import (
"bufio"
"encoding/json"
"github.com/ansible-semaphore/semaphore/db"
"os/exec"
"time"

Expand All @@ -29,15 +28,11 @@ func (t *task) log(msg string) {
sockets.Message(user, b)
}

go func() {
_, err := t.store.CreateTaskOutput(db.TaskOutput{
TaskID: t.task.ID,
Output: msg,
Time: now,
})

util.LogPanicWithFields(err, log.Fields{"error": "Failed to insert task output"})
}()
pool.logger <- logRecord{
task: t,
output: msg,
time: now,
}
}

// Readln reads from the pipe
Expand Down
25 changes: 21 additions & 4 deletions api/tasks/pool.go
Original file line number Diff line number Diff line change
@@ -1,20 +1,28 @@
package tasks

import (
"github.com/ansible-semaphore/semaphore/db"
"strconv"
"time"

log "github.com/Sirupsen/logrus"
"github.com/ansible-semaphore/semaphore/util"
)

type logRecord struct {
task *task
output string
time time.Time
}

type taskPool struct {
queue []*task
register chan *task
activeProj map[int]*task
activeNodes map[string]*task
running int
runningTasks map[int]*task
logger chan logRecord
}

type resourceLock struct {
Expand All @@ -23,12 +31,13 @@ type resourceLock struct {
}

var pool = taskPool{
queue: make([]*task, 0),
register: make(chan *task),
queue: make([]*task, 0), // queue of waiting tasks
register: make(chan *task), // add task to queue
activeProj: make(map[int]*task),
activeNodes: make(map[string]*task),
running: 0,
runningTasks: make(map[int]*task),
running: 0, // number of running tasks
runningTasks: make(map[int]*task), // working tasks
logger: make(chan logRecord, 1000), // store log records to database
}

var resourceLocker = make(chan *resourceLock)
Expand Down Expand Up @@ -78,12 +87,20 @@ func (p *taskPool) run() {

for {
select {
case record := <-p.logger:
err, _ := record.task.store.CreateTaskOutput(db.TaskOutput{
TaskID: record.task.task.ID,
Output: record.output,
Time: record.time,
})
log.Error(err)
case task := <-p.register:
p.queue = append(p.queue, task)
log.Debug(task)
msg := "Task " + strconv.Itoa(task.task.ID) + " added to queue"
task.log(msg)
log.Info(msg)

case <-ticker.C:
if len(p.queue) == 0 {
continue
Expand Down
37 changes: 21 additions & 16 deletions api/tasks/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,17 @@ func (t *task) fail() {
t.sendTelegramAlert()
}

func (t *task) destroyKeys() {
err := t.destroyKey(t.repository.SSHKey)
if err != nil {
t.log("Can't destroy repository SSH key, error: " + err.Error())
}
err = t.destroyKey(t.inventory.SSHKey)
if err != nil {
t.log("Can't destroy inventory SSH key, error: " + err.Error())
}
}

func (t *task) createTaskEvent() {
objType := taskTypeID
desc := "Task ID " + strconv.Itoa(t.task.ID) + " (" + t.template.Alias + ")" + " finished - " + strings.ToUpper(t.task.Status)
Expand Down Expand Up @@ -193,22 +204,8 @@ func (t *task) run() {
now := time.Now()
t.task.End = &now
t.updateStatus()

objType := taskTypeID
desc := "Task ID " + strconv.Itoa(t.task.ID) + " (" + t.template.Alias + ")" + " finished - " + strings.ToUpper(t.task.Status)

_, err := t.store.CreateEvent(db.Event{
UserID: t.task.UserID,
ProjectID: &t.projectID,
ObjectType: &objType,
ObjectID: &t.task.ID,
Description: &desc,
})

if err != nil {
t.log("Fatal error inserting an event")
panic(err)
}
t.createTaskEvent()
t.destroyKeys()
}()

if t.task.Status == taskStoppingStatus {
Expand Down Expand Up @@ -330,6 +327,14 @@ func (t *task) populateDetails() error {
return nil
}

func (t *task) destroyKey(key db.AccessKey) error {
path := key.GetPath()
if _, err := os.Stat(path); os.IsNotExist(err) {
return nil
}
return os.Remove(path)
}

func (t *task) installKey(key db.AccessKey) error {
t.log("access key " + key.Name + " installed")

Expand Down

0 comments on commit c1c8a9e

Please sign in to comment.