/
MiniService.go
243 lines (226 loc) · 7.01 KB
/
MiniService.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
package main
import (
"log"
"github.com/mogudy/golib/nbms/connector"
"github.com/bububa/cron"
"time"
"encoding/json"
"github.com/mogudy/golib/nbms/alarm"
"github.com/mogudy/golib/nbms/logwriter"
"errors"
"gopkg.in/validator.v2"
"github.com/go-xorm/builder"
consulapi "github.com/hashicorp/consul/api"
)
func exitOnError(err error, msg string){
if err != nil {
log.Fatalf("%s: %s", msg, err.Error())
}
}
func buildResp(code int, msg string, result interface{})([]byte,error){
stat,_ := json.Marshal(StandardResponse{code,msg,result})
if code == 200{
return stat,nil
}
return stat,errors.New(string(code))
}
var ca connector.ConsulService
var cr *cron.Cron
var tmm alarm.Timer
type (
GenericTask struct{
Id int64 `xorm:"pk autoincr" json:",omitempty"`
StartedAt int64 `xorm:"notnull" addtask:"nonzero"`
Schedule string `xorm:"notnull" json:",omitempty"`
Repetition int32 `xorm:"notnull default(0)" addtask:"nonzero"`
Count int32 `xorm:"notnull default(0)" json:",omitempty"`
Service string `xorm:"notnull" addtask:"nonzero"`
Api string `xorm:"notnull" addtask:"nonzero"`
Param string `xorm:"notnull varchar(1024) default('')" json:",omitempty"`
Method string `xorm:"notnull" addtask:"nonzero"`
Summary string `xorm:"notnull" addtask:"nonzero"`
CreatedAt time.Time `xorm:"notnull created" json:",omitempty"`
HandledAt time.Time `xorm:"notnull updated" json:",omitempty"`
HandledBy string `xorm:"notnull" json:",omitempty"`
Version int32 `xorm:"notnull version" json:",omitempty"`
Uuid string `xorm:"notnull unique" addtask:"nonzero" deltask:"nonzero"`
DeletedAt time.Time `xorm:"deleted" json:",omitempty"`
}
StandardResponse struct {
Code int
Msg string
Result interface{}
}
)
// 新增任务 addCron(StartedAt, Schedule, Repetition, Service, Api, param, Method, Summary, Uuid)
var taskVal = validator.NewValidator()
func addPost(param []byte)([]byte,error){
//task,err := parseTask(param)
taskVal.SetTag("addtask")
task := new(GenericTask)
if err := json.Unmarshal(param, task);err!=nil{
return buildResp(400,"Param invalid: ",err)
}
if err := taskVal.Validate(task); err != nil {
return buildResp(400,"Param invalid: ",err)
}
task.HandledBy = ca.Config().Service.Id
if _,err := ca.InsertRecord(task);err!=nil {
return buildResp(500,"Db query i error: ",err)
}
addTask(task)
return buildResp(200,"Success: ","Cron task added")
}
// 列出定时任务调度器中的所有任务
func listGet([]byte)([]byte,error){
everyone := make([]GenericTask, 0)
err := ca.FindRecords(&everyone,nil,nil)
if err!=nil {
return buildResp(500,"Db query s error: ",err)
}
res,err := json.Marshal(everyone)
if err!=nil {
return buildResp(500,"Db result parsing error: ",err)
}
return res,nil
}
// 获得定时任务调度器中的某任务详情(Uuid)
func jobGet(param []byte)([]byte,error){
task := new(GenericTask)
if err := json.Unmarshal(param, task);err!=nil{
return buildResp(400,"Param invalid: ",err)
}
record,err := ca.GetFirstRecord(task)
if err!=nil {
return buildResp(500,"Db query s error: ",err)
}
if record{
return buildResp(200,"Success",task)
}else{
return buildResp(204,"No Content","")
}
}
// 删除调度器中的任务(任务uuid)
func delPost(param []byte)([]byte,error){
taskVal.SetTag("deltask")
task := new(GenericTask)
if err := json.Unmarshal(param, task);err!=nil{
return buildResp(400,"Param invalid: ",err)
}
if err := taskVal.Validate(task); err != nil {
return buildResp(400,"Param invalid: ",err)
}
return delJob(task)
}
func delJob(t *GenericTask)([]byte,error){
cr.DeleteJob(t.Uuid)
if _,err := ca.DeleteRecord(t);err!=nil {
return buildResp(500,"Db query d error: ",err)
}
return buildResp(200,"Success: ","Cron task deleted.")
}
func addTask(t *GenericTask){
// Immediate, Deferred or AMQP
execTime := time.Unix(t.StartedAt,0)
nowTime := time.Now()
if t.Schedule == ""{
// Single Run
if execTime.After(nowTime.Add(5*time.Second)){
// Deferred to run if task was scheduled 5 secs later
tmm.RunFuncAt(execTime, func(){runTask(t)}, t.Summary)
}else{
// Immediate run if task was scheduled to run in 5 secs
runTask(t)
}
}else{
// Cron task
if execTime.After(nowTime.Add(5*time.Second)){
// Deferred to run if task was scheduled 5 secs later
tmm.RunFuncAt(execTime.Add(5*time.Second), func(){addCron(t)}, t.Summary)
}else{
// Immediate run if task was scheduled to run in 5 secs
addCron(t)
}
}
}
func addCron(t *GenericTask)error{
cr.AddFunc(t.Uuid,t.Schedule, func(){
if t.Repetition > 0 && t.Repetition > t.Count || t.Repetition<=0{
// repetation limits
runTask(t)
}else{
_,err := delJob(t)
if err!=nil{log.Printf("Task(uuid): %s deletion failed. \n",t.Uuid)}else{log.Printf("Task(uuid): %s has been deleted. \n",t.Uuid)}
}
})
return nil
}
func runTask(t *GenericTask)error{
err:=ca.SendRequest(t.Method,t.Service,t.Api,t.Param)
if err==nil{
t.Count = t.Count+1
ca.UpdateRecord(t,GenericTask{Id:t.Id})
}
return err
}
func loadTask(handler map[string]*consulapi.AgentService)error{
// extract service handler names
keys := make([]string, 0, len(handler))
for _,v:=range handler{
keys = append(keys,v.Service)
}
// load task that does not belong to existing handler
tasks := make([]GenericTask,0)
if err := ca.FindRecords(&tasks,builder.NotIn("handled_by",keys));err!=nil{
return err
}
for _,v:=range tasks{
// Added task
v.HandledBy = ca.Config().Service.Id
ca.UpdateRecord(v,GenericTask{Id:v.Id})
addTask(&v)
log.Printf("Loaded unhandled task(uuid:%s) from db.",v.Uuid)
}
return nil
}
func main(){
s,err := logwriter.New("app.log")
exitOnError(err, "Log file creation failed!")
//log.SetOutput(s)
log.Println("Application log file rotated")
cr = cron.New()
cr.Start()
defer cr.Stop()
tmm = alarm.CreateAlarm()
log.Println("Alarm & Cron core module loaded.")
cr.AddFunc("app_rotate_log","0 0 0 * * *",func(){
if err=s.Rotate();err!=nil{
log.Println(err.Error())
}
})
ca,err = connector.CreateService("config.toml")
exitOnError(err, "Consul service creation failed!")
defer ca.DeRegister()
err = ca.CreateDataTable(new(GenericTask))
exitOnError(err, "Data table creation failed!")
log.Println("GenericTask data table synchronized")
err = ca.RegisterMessageHandler("add",func(msg []byte)([]byte,error){
log.Printf("received message: %s \n",string(msg))
return []byte(""),nil
})
exitOnError(err,"Message Queue connection failed!")
// load unhandled task from DB
svcs,err := ca.Services()
loadTask(svcs)
ca.RegisterHttpHandler("/add", connector.POST, addPost)
log.Printf("Http API registered:%s \n", "/add")
ca.RegisterHttpHandler("/del", connector.POST, delPost)
log.Printf("Http API registered:%s \n", "/del")
ca.RegisterHttpHandler("/list", connector.GET, listGet)
log.Printf("Http API registered:%s \n", "/list")
ca.RegisterHttpHandler("/job", connector.GET, jobGet)
log.Printf("Http API registered:%s \n", "/job")
if err = ca.StartServer(true);err!=nil{
exitOnError(err,"Http API registration failed!")
}
}