-
Notifications
You must be signed in to change notification settings - Fork 6
/
accelerator.go
72 lines (61 loc) · 1.54 KB
/
accelerator.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
package accelerate
import (
"fmt"
"hash/fnv"
)
// Accelerator putting your events and handling process into this accelerator to execute them in concurrency
type Accelerator struct {
size int
channels []chan *ExeRequest
done <-chan struct{}
}
// ExeRequest ...
type ExeRequest struct {
fn func()
}
// NewAccelerator initializing a new accelerator
func NewAccelerator(size int, done <-chan struct{}) *Accelerator {
ac := &Accelerator{
size: size,
done: done,
}
for i := 0; i < size; i++ {
arChan := make(chan *ExeRequest)
ac.channels = append(ac.channels, arChan)
go func(c chan *ExeRequest) {
for {
select {
case r := <-c:
fmt.Printf("Take an accelerate request from a buffer : %v\n", r)
r.fn()
case <-done:
return
}
}
}(arChan)
}
return ac
}
// Accelerate The purpose of this accelerator is twofold:
// 1.dividing the events into a group of channels by calculating hash code of event's name
// 2.there is one goroutine for each channel is waiting for handling the accelerate request
func (ac *Accelerator) Accelerate(fn func(), hashKey string) {
// The case that channels' size is less than 0 means this function could be execute in current goroutine.
if ac.size <= 0 {
fn()
} else {
hashcode := FNV32a(hashKey)
c := ac.channels[int(hashcode)%ac.size]
go func(fn func()) {
c <- &ExeRequest{
fn: fn,
}
}(fn)
}
}
// FNV32a calculating the hash code of a text
func FNV32a(text string) uint32 {
algorithm := fnv.New32a()
algorithm.Write([]byte(text))
return algorithm.Sum32()
}