-
Notifications
You must be signed in to change notification settings - Fork 15
/
task_sync.go
197 lines (186 loc) · 5.34 KB
/
task_sync.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
package system
import (
"context"
"database/sql"
"fmt"
"github.com/noovertime7/gin-mysqlbak/agent/agentdao"
"github.com/noovertime7/gin-mysqlbak/agent/agentdto"
"github.com/noovertime7/gin-mysqlbak/agent/agentservice"
"github.com/noovertime7/gin-mysqlbak/agent/repository"
"github.com/noovertime7/gin-mysqlbak/dao"
"github.com/noovertime7/gin-mysqlbak/public"
"github.com/noovertime7/gin-mysqlbak/public/database"
"github.com/noovertime7/mysqlbak/pkg/log"
"github.com/robfig/cron/v3"
"strings"
"sync"
"time"
)
type taskSync struct {
ctx context.Context
c *cron.Cron
lock sync.RWMutex
ReSync string
StoreInfo chan *agentdao.TaskOverview
}
func NewTaskSyncJob(ctx context.Context, ReSync string) *taskSync {
return &taskSync{
ctx: ctx,
c: cron.New(),
lock: sync.RWMutex{},
ReSync: ReSync,
StoreInfo: make(chan *agentdao.TaskOverview, 100),
}
}
func (t *taskSync) Start() {
EntryID, err := t.c.AddJob(t.ReSync, t)
if err != nil {
log.Logger.Errorf("任务同步定时任务启动任务失败,任务ID:%d", EntryID)
return
}
go t.Store(t.ctx)
t.c.Start()
log.Logger.Infof("任务同步定时任务启动任务成功,任务ID:%d", EntryID)
}
func (t *taskSync) Run() {
var affect int
log.Logger.Info("启动集群全局任务管理定时器,开始循环同步任务数据到服务端")
h := agentservice.GetClusterHostService()
ts := agentservice.GetClusterTaskService()
//1、查询当前所有服务
var AgentService *repository.AgentService
services, err := AgentService.GetAgentList(t.ctx, &agentdto.AgentListInput{PageNo: 1, PageSize: 9999})
if err != nil {
log.Logger.Error(err)
}
for _, service := range services.AgentOutPutItem {
//2、通过服务循环查询所有主机
hosts, err := h.HostList(t.ctx, &agentdto.HostListInput{
ServiceName: service.ServiceName,
Info: "",
PageNo: 1,
PageSize: 9999,
})
if err != nil {
t.handleErr(err)
return
}
for _, host := range hosts.ListItem {
//3、通过主机循环查询主机下所有任务
tasks, err := ts.GetTaskUnscopedList(t.ctx, &agentdto.TaskListInput{
ServiceName: service.ServiceName,
HostId: host.ID,
Info: "",
PageNo: 1,
PageSize: 9999,
})
if err != nil {
t.handleErr(err)
return
}
for _, task := range tasks.TaskListItem {
//4、到保存数据库
taskOverDB := &agentdao.TaskOverview{
ServiceName: service.ServiceName,
HostId: host.ID,
Host: host.Host,
Type: host.Type,
TaskId: task.ID,
DbName: task.DBName,
BackupCycle: task.BackupCycle,
KeepNumber: task.KeepNumber,
FinishNum: task.FinishNum,
Status: sql.NullInt64{Int64: task.Status, Valid: true},
CreatedAt: public.StringToTime(task.CreateAt),
UpdateAt: public.StringToTime(task.UpdateAt),
IsDeleted: sql.NullInt64{Int64: task.IsDeleted, Valid: true},
DeletedAt: public.StringToTime(task.DeletedAt),
}
t.StoreInfo <- taskOverDB
affect++
}
}
}
t.storeJobHistory(t.ctx, affect)
return
}
var (
success = true
errMessage []string
)
func (t *taskSync) handleErr(err error) {
success = false
errMessage = append(errMessage, err.Error())
}
func (t *taskSync) GetErr() ([]string, bool) {
return errMessage, success
}
func (t *taskSync) Stop() {
t.c.Stop()
}
func (t *taskSync) storeJobHistory(ctx context.Context, affected int) {
jobHistoryDB := &dao.JobHistory{
JobType: public.TaskSyncJob,
JobCycle: t.ReSync,
Affected: affected,
UpdateTime: time.Now(),
}
errs, ok := t.GetErr()
if !ok {
jobHistoryDB.Status = sql.NullInt64{Int64: 0, Valid: true}
jobHistoryDB.Message = fmt.Sprintf(strings.Join(errs, ";"))
success = true
errMessage = []string{}
} else {
jobHistoryDB.Status = sql.NullInt64{Int64: 1, Valid: true}
jobHistoryDB.Message = "集群任务同步定时任务执行成功"
}
j, err := jobHistoryDB.Find(ctx, database.GetDB(), &dao.JobHistory{JobType: jobHistoryDB.JobType})
if err != nil {
jobHistoryDB.Status = sql.NullInt64{Int64: 0, Valid: true}
jobHistoryDB.Message = err.Error()
}
if j.ID != 0 {
jobHistoryDB.ID = j.ID
if err := jobHistoryDB.Updates(ctx, database.GetDB()); err != nil {
log.Logger.Error("集群任务同步定时任务保存JobHistory数据库失败")
return
}
} else {
if err := jobHistoryDB.Save(ctx, database.GetDB()); err != nil {
log.Logger.Error("集群任务同步定时任务保存JobHistory数据库失败")
return
}
}
}
func (t *taskSync) Store(ctx context.Context) {
tx := database.GetDB()
for {
select {
case taskOverDB := <-t.StoreInfo:
//先去查询一下看看有没有
temp, err := taskOverDB.Find(ctx, tx, &agentdao.TaskOverview{ServiceName: taskOverDB.ServiceName, TaskId: taskOverDB.TaskId, HostId: taskOverDB.HostId})
if err != nil {
t.handleErr(err)
return
}
if temp.ID != 0 {
//更新操作
log.Logger.Debug("任务总览定时器开始更新数据库", temp)
if err := taskOverDB.Updates(ctx, tx, temp.ID); err != nil {
t.handleErr(err)
return
}
} else {
//没有查到,新增操作
log.Logger.Debug("任务总览定时器开始插入数据库", taskOverDB)
if err := taskOverDB.Save(ctx, tx); err != nil {
t.handleErr(err)
return
}
}
case <-ctx.Done():
return
}
}
}