forked from xiaonanln/goworld
/
async.go
112 lines (94 loc) · 2.78 KB
/
async.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
package async
import (
"sync"
"github.com/xiaonanln/goworld/engine/consts"
"github.com/xiaonanln/goworld/engine/gwlog"
"github.com/xiaonanln/goworld/engine/gwutils"
"github.com/xiaonanln/goworld/engine/post"
"golang.org/x/net/context"
)
var (
asyncRunning, asyncCancelRunning = context.WithCancel(context.Background())
numAsyncJobWorkersRunning sync.WaitGroup
)
// AsyncCallback is a function which will be called after async job is finished with result and error
type AsyncCallback func(res interface{}, err error)
func (ac AsyncCallback) callback(res interface{}, err error) {
if ac != nil {
post.Post(func() {
ac(res, err)
})
}
}
// AsyncRoutine is a function that will be executed in the async goroutine and its result and error will be passed to AsyncCallback
type AsyncRoutine func() (res interface{}, err error)
type asyncJobWorker struct {
jobQueue chan asyncJobItem
}
type asyncJobItem struct {
routine AsyncRoutine
callback AsyncCallback
}
func newAsyncJobWorker() *asyncJobWorker {
ajw := &asyncJobWorker{
jobQueue: make(chan asyncJobItem, consts.ASYNC_JOB_QUEUE_MAXLEN),
}
numAsyncJobWorkersRunning.Add(1)
go ajw.loop()
return ajw
}
func (ajw *asyncJobWorker) appendJob(routine AsyncRoutine, callback AsyncCallback) {
ajw.jobQueue <- asyncJobItem{routine, callback}
}
func (ajw *asyncJobWorker) loop() {
defer numAsyncJobWorkersRunning.Done()
gwutils.RepeatUntilPanicless(func() {
for item := range ajw.jobQueue {
res, err := item.routine()
item.callback.callback(res, err)
}
})
}
var (
asyncJobWorkersLock sync.RWMutex
asyncJobWorkers = map[string]*asyncJobWorker{}
)
func getAsyncJobWorker(group string) (ajw *asyncJobWorker) {
asyncJobWorkersLock.RLock()
ajw = asyncJobWorkers[group]
asyncJobWorkersLock.RUnlock()
if ajw == nil {
asyncJobWorkersLock.Lock()
ajw = asyncJobWorkers[group]
if ajw == nil {
ajw = newAsyncJobWorker()
asyncJobWorkers[group] = ajw
}
asyncJobWorkersLock.Unlock()
}
return
}
// AppendAsyncJob append an async job to be executed asyncly (not in the game goroutine)
func AppendAsyncJob(group string, routine AsyncRoutine, callback AsyncCallback) {
ajw := getAsyncJobWorker(group)
ajw.appendJob(routine, callback)
}
// WaitClear wait for all async job workers to finish (should only be called in the game goroutine)
func WaitClear() bool {
var cleared bool
// Close all job queue workers
gwlog.Infof("Waiting for all async job workers to be cleared ...")
asyncJobWorkersLock.Lock()
if len(asyncJobWorkers) > 0 {
for group, alw := range asyncJobWorkers {
close(alw.jobQueue)
gwlog.Infof("\tclear %s", group)
}
asyncJobWorkers = map[string]*asyncJobWorker{}
cleared = true
}
asyncJobWorkersLock.Unlock()
// wait for all job workers to quit
numAsyncJobWorkersRunning.Wait()
return cleared
}