This repository has been archived by the owner on Jun 18, 2020. It is now read-only.
forked from waffle-iron/core0
/
queue.go
91 lines (78 loc) · 1.39 KB
/
queue.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
package pm
import (
"container/list"
"fmt"
"sync"
)
/**
Queue is used for sequential cmds exectuions
*/
type Queue struct {
queues map[string]*list.List
ch chan *jobImb
lock sync.Mutex
o sync.Once
closed bool
}
//Init initializes the queue
func (q *Queue) Init() {
q.o.Do(func() {
q.queues = make(map[string]*list.List)
q.ch = make(chan *jobImb)
})
}
//Close the queue. Queue can't be used after close
func (q *Queue) Close() {
q.lock.Lock()
defer q.lock.Unlock()
q.closed = true
close(q.ch)
}
//Channel return job channel
func (q *Queue) Channel() <-chan *jobImb {
return q.ch
}
//Push a job on queue
func (q *Queue) Push(job *jobImb) error {
q.lock.Lock()
defer q.lock.Unlock()
if q.closed {
return fmt.Errorf("closed queue")
}
name := job.Command().Queue
if name == "" {
q.ch <- job
return nil
}
queue, ok := q.queues[name]
if !ok {
queue = list.New()
q.queues[name] = queue
}
queue.PushBack(job)
if queue.Len() == 1 {
//first job in the queue
q.ch <- job
}
return nil
}
//Notify tell queue that a job execution has completed
func (q *Queue) Notify(job Job) {
q.lock.Lock()
defer q.lock.Unlock()
if q.closed {
return
}
name := job.Command().Queue
queue, ok := q.queues[name]
if !ok {
return
}
queue.Remove(queue.Front())
if queue.Len() == 0 {
delete(q.queues, name)
return
}
next := queue.Front().Value.(*jobImb)
q.ch <- next
}