-
Notifications
You must be signed in to change notification settings - Fork 2
/
main.go
109 lines (98 loc) · 2.15 KB
/
main.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
package main
import (
"context"
"log"
"os"
"os/signal"
"strconv"
"time"
sneaker "github.com/oldfritter/sneaker-go/v3"
amqp "github.com/rabbitmq/amqp091-go"
"github.com/oldfritter/sneaker-go/v3/example/config"
"github.com/oldfritter/sneaker-go/v3/example/initializers"
)
var (
closeChan = make(chan int)
)
func main() {
initialize()
initializers.InitWorkers()
go func() {
t := time.NewTimer(time.Second * 30)
for {
<-t.C
if initializers.IsAmqpConnectionClosed() {
initializers.InitializeAmqpConnection()
StartAllWorkers()
} else {
for _, worker := range config.AllWorkerIs {
if worker.IsChannelClosed() {
worker.GetChannel()
sneaker.SubscribeMessageByQueue(
worker,
amqp.Table{},
)
}
}
}
t.Reset(time.Second * 30)
}
}()
StartAllWorkers()
quit := make(chan os.Signal, 1)
signal.Notify(quit, os.Interrupt)
<-quit
log.Println("Shutdown Server ...")
go recycle()
ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
select {
case <-closeChan:
cancel()
case <-ctx.Done():
cancel()
}
}
func initialize() {
initializers.InitializeAmqpConfig()
setLog()
err := os.MkdirAll("pids", 0755)
if err != nil {
log.Fatalf("create folder error: %v", err)
}
err = os.WriteFile("pids/workers.pid", []byte(strconv.Itoa(os.Getpid())), 0644)
if err != nil {
log.Fatalf("open file error: %v", err)
}
}
func StartAllWorkers() {
for _, w := range config.AllWorkerIs {
for i := 0; i < w.GetThreads(); i++ {
go func(w sneaker.WorkerI) {
w.SetRabbitMqConnect(&initializers.RabbitMqConnect)
w.InitLogger()
sneaker.SubscribeMessageByQueue(w, amqp.Table{})
}(w)
}
}
}
func setLog() {
err := os.Mkdir("logs", 0755)
if err != nil {
if !os.IsExist(err) {
log.Fatalf("create folder error: %v", err)
}
}
file, err := os.OpenFile("logs/workers.log", os.O_RDWR|os.O_CREATE|os.O_APPEND, 0644)
if err != nil {
log.Fatalf("open file error: %v", err)
}
log.SetOutput(file)
}
func recycle() {
for i, worker := range config.AllWorkerIs {
worker.Stop()
worker.Recycle()
log.Println("stoped: ", worker.GetName(), "[", i, "]")
}
closeChan <- 1
}