Skip to content

Commit

Permalink
feat($task): 支持任务同时在多个节点上运行
Browse files Browse the repository at this point in the history
Closes #7
  • Loading branch information
ouqiang committed Aug 6, 2017
1 parent b509fce commit 337ee35
Show file tree
Hide file tree
Showing 16 changed files with 315 additions and 117 deletions.
28 changes: 20 additions & 8 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
* 任务依赖配置
* 任务类型
* shell任务
> 在任务节点上执行shell命令
> 在任务节点上执行shell命令, 支持任务同时在多个节点上运行
* HTTP任务
> 访问指定的URL地址, 由调度器直接执行, 不依赖任务节点
* 查看任务执行日志
Expand All @@ -33,7 +33,7 @@

## 下载
[v1.0](https://github.com/ouqiang/gocron/releases/tag/v1.0)
[v1.1](https://github.com/ouqiang/gocron/releases/tag/v1.1)


## 安装
Expand All @@ -45,9 +45,9 @@
* 调度器启动
* Windows: `gocron.exe web`
* Linux、Mac OS: `./gocron web`
* 任务节点启动
* Windows: `gocron-node.exe ip:port (默认0.0.0.0:5921)`
* Linux、Mac OS: `./gocron-node ip:port (默认0.0.0.0:5921)`
* 任务节点启动, 默认监听0.0.0.0:5921
* Windows: `gocron-node.exe`
* Linux、Mac OS: `./gocron-node`
4. 浏览器访问 http://localhost:5920

### 源码安装
Expand All @@ -64,12 +64,13 @@
* --host 默认0.0.0.0
* -p 端口, 指定端口, 默认5920
* -e 指定运行环境, dev|test|prod, dev模式下可查看更多日志信息, 默认prod
* -d 后台运行
* -h 查看帮助
* gocron-node ip:port, 默认0.0.0.0:5921
* gocron-node
* -allow-root *nix平台允许以root用户运行
* -s ip:port 监听地址

## 程序使用的组件
* web框架 [Macaron](http://go-macaron.com/)
* Web框架 [Macaron](http://go-macaron.com/)
* 定时任务调度 [Cron](https://github.com/robfig/cron)
* ORM [Xorm](https://github.com/go-xorm/xorm)
* UI框架 [Semantic UI](https://semantic-ui.com/)
Expand All @@ -78,3 +79,14 @@

## 反馈
提交[issue](https://github.com/ouqiang/gocron/issues/new)

## ChangeLog

v1.1
--------

* 任务可同时在多个节点上运行
* *nix平台默认禁止以root用户运行任务节点
* 子任务命令中增加预定义占位符, 子任务可根据主任务运行结果执行相应操作
* 删除守护进程模块
* Web访问日志输出到终端
9 changes: 3 additions & 6 deletions cmd/web.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,6 @@ var CmdWeb = cli.Command{
Value: "prod",
Usage: "runtime environment, dev|test|prod",
},
cli.BoolFlag{
Name: "d",
Usage: "-d=true, run as daemon process",
},
},
}

Expand Down Expand Up @@ -155,8 +151,6 @@ func shutdown() {
// 停止所有任务调度
logger.Info("停止定时任务调度")
serviceTask.StopAll()
// 释放gRPC连接池
grpcpool.Pool.ReleaseAll()

taskNumInRunning := service.TaskNum.Num()
logger.Infof("正在运行的任务有%d个", taskNumInRunning)
Expand All @@ -170,4 +164,7 @@ func shutdown() {
time.Sleep(3 * time.Second)
taskNumInRunning = service.TaskNum.Num()
}

// 释放gRPC连接池
grpcpool.Pool.ReleaseAll()
}
2 changes: 1 addition & 1 deletion gocron.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (
"github.com/ouqiang/gocron/cmd"
)

const AppVersion = "1.0"
const AppVersion = "1.1"

func main() {
app := cli.NewApp()
Expand Down
1 change: 1 addition & 0 deletions models/host.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ type Host struct {
Port int `xorm:"notnull default 22"` // 主机端口
Remark string `xorm:"varchar(100) notnull default '' "` // 备注
BaseModel `xorm:"-"`
Selected bool `xorm:"-"`
}

// 新增
Expand Down
2 changes: 1 addition & 1 deletion models/migration.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ func (migration *Migration) Exec(dbName string) error {
setting := new(Setting)
task := new(Task)
tables := []interface{}{
&User{}, task, &TaskLog{}, &Host{}, setting,&LoginLog{},
&User{}, task, &TaskLog{}, &Host{}, setting,&LoginLog{},&TaskHost{},
}
for _, table := range tables {
exist, err:= Db.IsTableExist(table)
Expand Down
4 changes: 2 additions & 2 deletions models/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,8 @@ func (model *BaseModel) parsePageAndPageSize(params CommonMap) {
if model.Page <= 0 {
model.Page = Page
}
if model.PageSize <= 0 || model.PageSize > MaxPageSize {
model.PageSize = PageSize
if model.PageSize <= 0 {
model.PageSize = MaxPageSize
}
}

Expand Down
120 changes: 67 additions & 53 deletions models/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@ type Task struct {
Timeout int `xorm:"mediumint notnull default 0"` // 任务执行超时时间(单位秒),0不限制
Multi int8 `xorm:"tinyint notnull default 1"` // 是否允许多实例运行
RetryTimes int8 `xorm:"tinyint notnull default 0"` // 重试次数
HostId int16 `xorm:"smallint notnull index default 0"` // RPC host id,
NotifyStatus int8 `xorm:"smallint notnull default 1"` // 任务执行结束是否通知 0: 不通知 1: 失败通知 2: 执行结束通知
NotifyType int8 `xorm:"smallint notnull default 0"` // 通知类型 1: 邮件 2: slack
NotifyReceiverId string `xorm:"varchar(256) notnull default '' "` // 通知接受者ID, setting表主键ID,多个ID逗号分隔
Expand All @@ -50,17 +49,11 @@ type Task struct {
Created time.Time `xorm:"datetime notnull created"` // 创建时间
Deleted time.Time `xorm:"datetime deleted"` // 删除时间
BaseModel `xorm:"-"`
Hosts []TaskHostDetail `xorm:"-"`
}

type TaskHost struct {
Task `xorm:"extends"`
Name string
Port int
Alias string
}

func (TaskHost) TableName() string {
return TablePrefix + "task"
func taskHostTableName() []string {
return []string{TablePrefix + "task_host", "th"}
}

// 新增
Expand Down Expand Up @@ -88,7 +81,7 @@ func (task *Task) CreateTestTask() {

func (task *Task) UpdateBean(id int) (int64, error) {
return Db.ID(id).
Cols("name,spec,protocol,command,timeout,multi,retry_times,host_id,remark,notify_status,notify_type,notify_receiver_id, dependency_task_id, dependency_status").
Cols("name,spec,protocol,command,timeout,multi,retry_times,remark,notify_status,notify_type,notify_receiver_id, dependency_task_id, dependency_status").
Update(task)
}

Expand All @@ -113,36 +106,48 @@ func (task *Task) Enable(id int) (int64, error) {
}

// 获取所有激活任务
func (task *Task) ActiveList() ([]TaskHost, error) {
list := make([]TaskHost, 0)
fields := "t.*, host.alias,host.name,host.port"
err := Db.Alias("t").
Join("LEFT", hostTableName(), "t.host_id=host.id").
Where("t.status = ? AND t.level = ?", Enabled, TaskLevelParent).
Cols(fields).
func (task *Task) ActiveList() ([]Task, error) {
list := make([]Task, 0)
err := Db.Where("status = ? AND level = ?", Enabled, TaskLevelParent).
Find(&list)

return list, err
if err != nil {
return list, err
}

return task.setHostsForTasks(list)
}

// 获取某个主机下的所有激活任务
func (task *Task) ActiveListByHostId(hostId int16) ([]TaskHost, error) {
list := make([]TaskHost, 0)
fields := "t.*, host.alias,host.name,host.port"
err := Db.Alias("t").
Join("LEFT", hostTableName(), "t.host_id=host.id").
Where("t.status = ? AND t.host_id = ? AND t.level = ?", Enabled, hostId, TaskLevelParent).
Cols(fields).
func (task *Task) ActiveListByHostId(hostId int16) ([]Task, error) {
taskHostModel := new(TaskHost)
taskIds, err := taskHostModel.GetTaskIdsByHostId(hostId)
if err != nil {
return nil, err
}
list := make([]Task, 0)
err = Db.Where("status = ? AND level = ?", Enabled, TaskLevelParent).
In("id", taskIds...).
Find(&list)
if err != nil {
return list, err
}

return list, err
return task.setHostsForTasks(list)
}

// 判断主机id是否有引用
func (task *Task) HostIdExist(hostId int16) (bool, error) {
count, err := Db.Where("host_id = ?", hostId).Count(task);
func (task *Task) setHostsForTasks(tasks []Task) ([]Task, error) {
taskHostModel := new(TaskHost)
var err error
for i, value := range tasks {
taskHostDetails, err := taskHostModel.GetHostIdsByTaskId(value.Id)
if err != nil {
return nil, err
}
tasks[i].Hosts = taskHostDetails
}

return count > 0, err
return tasks, err
}

// 判断任务名称是否存在
Expand All @@ -168,28 +173,37 @@ func (task *Task) GetStatus(id int) (Status, error) {
return task.Status, nil
}

func(task *Task) Detail(id int) (TaskHost, error) {
taskHost := TaskHost{}
fields := "t.*, host.alias,host.name,host.port"
_, err := Db.Alias("t").Join("LEFT", hostTableName(), "t.host_id=host.id").Where("t.id=?", id).Cols(fields).Get(&taskHost)
func(task *Task) Detail(id int) (Task, error) {
t := Task{}
_, err := Db.Where("id=?", id).Get(&t)

if err != nil {
return t, err
}

taskHostModel := new(TaskHost)
t.Hosts, err = taskHostModel.GetHostIdsByTaskId(id)

return taskHost, err
return t, err
}

func (task *Task) List(params CommonMap) ([]TaskHost, error) {
func (task *Task) List(params CommonMap) ([]Task, error) {
task.parsePageAndPageSize(params)
list := make([]TaskHost, 0)
fields := "t.*, host.alias,host.name"
session := Db.Alias("t").Join("LEFT", hostTableName(), "t.host_id=host.id")
list := make([]Task, 0)
session := Db.Alias("t").Join("LEFT", taskHostTableName(), "t.id = th.task_id")
task.parseWhere(session, params)
err := session.Cols(fields).Desc("t.id").Limit(task.PageSize, task.pageLimitOffset()).Find(&list)
err := session.GroupBy("t.id").Desc("t.id").Cols("t.*").Limit(task.PageSize, task.pageLimitOffset()).Find(&list)

return list, err
if err != nil {
return nil, err
}

return task.setHostsForTasks(list)
}

// 获取依赖任务列表
func (task *Task) GetDependencyTaskList(ids string) ([]TaskHost, error) {
list := make([]TaskHost, 0)
func (task *Task) GetDependencyTaskList(ids string) ([]Task, error) {
list := make([]Task, 0)
if ids == "" {
return list, nil
}
Expand All @@ -198,21 +212,24 @@ func (task *Task) GetDependencyTaskList(ids string) ([]TaskHost, error) {
for i, v := range idList {
taskIds[i] = v
}
fields := "t.*, host.alias,host.name,host.port"
fields := "t.*"
err := Db.Alias("t").
Join("LEFT", hostTableName(), "t.host_id=host.id").
Where("t.level = ?", TaskLevelChild).
In("t.id", taskIds).
Cols(fields).
Find(&list)

return list, err
if err != nil {
return list, err
}

return task.setHostsForTasks(list)
}

func (task *Task) Total(params CommonMap) (int64, error) {
session := Db.Alias("t").Join("LEFT", hostTableName(), "t.host_id=host.id")
session := Db.Alias("t").Join("LEFT", taskHostTableName(), "t.id = th.task_id")
task.parseWhere(session, params)
return session.Count(task)
return session.GroupBy("t.id").Count(task)
}

// 解析where
Expand All @@ -226,7 +243,7 @@ func (task *Task) parseWhere(session *xorm.Session, params CommonMap) {
}
hostId, ok := params["HostId"]
if ok && hostId.(int) > 0 {
session.And("host_id = ?", hostId)
session.And("th.host_id = ?", hostId)
}
name, ok := params["Name"]
if ok && name.(string) != "" {
Expand All @@ -242,6 +259,3 @@ func (task *Task) parseWhere(session *xorm.Session, params CommonMap) {
}
}

func hostTableName() []string {
return []string{TablePrefix + "host", "host"}
}
Loading

0 comments on commit 337ee35

Please sign in to comment.