Skip to content
A lightweight asynchronous timing task system powered by Go
Branch: master
Clone or download
flike Merge pull request #2 from kingsoft-wps/develop
Add english document for kingtask
Latest commit 68fab69 Nov 5, 2015
Permalink
Type Name Latest commit message Commit time
Failed to load latest commit information.
Godeps ditto Nov 4, 2015
broker
cmd ditto Nov 4, 2015
config add rpc async task Oct 30, 2015
core
doc ditto Nov 4, 2015
etc remove broker addr from work config Oct 23, 2015
task
worker
.gitignore first commit Oct 21, 2015
.travis.yml add travis Oct 23, 2015
License
Makefile
ReadMe.md
dev.sh fix travis error Nov 4, 2015

ReadMe.md

1. kingtask简介

kingtask(English document)是一个由Go开发的异步任务系统。 主要特性包含以下几个部分:

  1. 支持定时的异步任务。
  2. 支持失败重试机制,重试时刻和次数可自定义。
  3. 任务执行结果可查询。
  4. 一个异步任务由一个可执行文件或者一个Web API组成,开发语言不限。
  5. 任务是无状态的,执行异步任务之前,不需要向kingtask注册任务。
  6. broker和worker通过redis解耦。
  7. 通过配置redis为master-slave架构,可实现kingtask的高可用,因为worker是无状态的,redis的master宕机后,可以修改worker配置将其连接到slave上。

2. kingtask架构

kingtask架构图如下所示: 架构图

kingtask的实现步骤如下所述:

  1. broker收到client发送过来的异步任务(一个异步任务由一个唯一的uuid标示)之后,判断异步任务是否定时,如果未定时,则直接将异步任务封装成一个结构体,存入redis。如果定时,则通过定时器触发,将异步任务封装成一个结构体,存入redis。
  2. worker从redis中获取异步任务,或者到任务之后,执行该任务,并将任务结果存入redis。
  3. 对于失败的任务,如果该任务有重试机制,broker会重新发送该任务到redis,然后worker会重新执行。

3. kingtask使用

3.1 配置broker

#broker地址
addr : 0.0.0.0:9595
#redis地址
redis : 127.0.0.1:6379
#log输出到文件,可不配置
#log_path: /Users/flike/src 
#日志级别
log_level: debug

3.2 配置worker

#redis地址
redis : 127.0.0.1:6379
#异步任务可执行文件目录
bin_path : /Users/flike/src
#日志输出目录,可不配置
#log_path : /Users/flike/src
#日志级别
log_level: debug

#每个任务执行时间间隔,单位为秒
period : 1
#结果保存时间,单位为秒
result_keep_time : 1000
#任务执行最长时间,单位秒
task_run_time: 30

3.3 运行broker和worker

#将异步任务的可执行文件放到bin_path目录
cp example /Users/flike/src
#转到kingtask目录
cd kingtask
#启动broker
./bin/broker -config=etc/broker.yaml
#启动worker
./bin/worker -config=etc/worker.yaml

3.3.1 example异步任务源码

异步任务的结果需要输出到标准输出(os.Stdout),出错信息需要输出到标准出错输出(os.Stderr)。

//example.go
package main

import (
	"fmt"
	"os"
	"strconv"
)

func main() {
	if len(os.Args) != 3 {
		fmt.Fprintf(os.Stderr, "args count must be two")
		return
	}
	left, err := strconv.ParseInt(os.Args[1], 10, 64)
	if err != nil {
		fmt.Fprintf(os.Stderr, "err:%s", err.Error())
		return
	}
	right, err := strconv.ParseInt(os.Args[2], 10, 64)
	if err != nil {
		fmt.Fprintf(os.Stderr, "err:%s", err.Error())
		return
	}
	sum := left + right
	fmt.Fprintf(os.Stdout, "%d", sum)
}

3.3.2 调用异步任务

kingtask异步任务系统提供Web API接口供客户端调用异步任务,主要有以下API接口:

(1). 执行脚本异步任务API接口

POST /api/v1/task/script

#请求参数
bin_name //字符串类型,表示异步对应的可执行文件名,必须提供
args //字符串类型,执行参数,多个参数用空格分隔,可为空
start_time //整型,异步任务开始执行时刻,为空表示立刻执行,可为空
time_interval //字符串类型,表示失败后重试的时间间隔序列,可为空
max_run_time //整型,异步任务最长运行时间(单位为秒),超过将会被系统kill,为空则使用系统统一的超时时长

#返回值
如果出错返回403和出错信息
如果调用成功返回200和标示该task的uuid,该uuid可用于查询任务结果

例如执行以下API调用:

通过httpie工具执行以下命令
http POST 127.0.0.1:9595/api/v1/task bin_name="mytask" args="12 hello" start_time=1445562622 time_interval="60 600 3600"

则kingtask会执行以下操作:

- 执行/Users/flike/src(由woker配置的目录)目录下的mytask可执行文件,参数是12和hello。
- 异步任务开始执行的时刻是1445562622(时间戳)。
- 如果该异步任务执行失败,kingtask会重试该异步任务,时间间隔是:60s,600s,3600s。

(2). 执行RPC异步任务API接口

POST /api/v1/task/rpc

#请求参数
method //请求类型:GET,PUT,POST,DELETE
url //异步任务对应的URL,需要加单引号
args //json Marshal后的字符串,需要加单引号
start_time //整型,异步任务开始执行时刻,为空表示立刻执行,可为空
time_interval //字符串类型,表示失败后重试的时间间隔序列,可为空
max_run_time //整型,异步任务最长运行时间(单位为秒),超过将会被系统kill,为空则使用系统统一的超时时长

#返回值
如果出错返回403和出错信息
如果调用成功返回200和标示该task的uuid,该uuid可用于查询任务结果

例如执行以下API调用:

通过httpie工具执行以下命令
http POST 127.0.0.1:9595/api/v1/task/rpc method="POST" url="http://127.0.0.1:1323/sum" args='{"a":132,"b":75}'

则kingtask会执行:POST 参数(args)到URL(http://127.0.0.1:1323/sum)

(3). 查看异步任务结果API接口

kingtask中的worker在执行完异步任务之后,都会将异步任务的结果存入redis,结果过期时间可配置。 客户端可通过以下API接口查看异步任务结果:

GET /api/v1/task/result/:uuid

参数是调用执行异步任务返回的uuid。
返回值
如果出错返回403和出错信息
如果调用成功返回200和和任务结果
例如
http GET 127.0.0.1:9595/api/v1/task/result/db3e0b22-a249-4ed2-9532-fc6318ccd321

(4). 统计休息查看

查看积压任务个数

http GET 127.0.0.1:9595/api/v1/task/count/undo
返回值
如果出错返回403和出错信息
如果调用成功返回200和和积压任务个数

查看某一天执行失败的异步任务个数

http GET 127.0.0.1:9595/api/v1/task/result/failure/:date
date参数格式为:2006-01-02

返回值
如果出错返回403和出错信息
如果调用成功返回200和和失败任务个数

查看某一天执行成功的异步任务个数

http GET 127.0.0.1:9595/api/v1/task/result/success/:date
date参数格式为:2006-01-02
返回值
如果出错返回403和出错信息
如果调用成功返回200和和成功任务个数

3.3.3 调用异步任务例子

➜  ~  http POST 127.0.0.1:9595/api/v1/task bin_name="example" args="12 34"
HTTP/1.1 200 OK
Content-Length: 38
Content-Type: application/json; charset=utf-8
Date: Fri, 23 Oct 2015 01:10:22 GMT

"db3e0b22-a249-4ed2-9532-fc6318ccd321"

➜  ~  http GET 127.0.0.1:9595/api/v1/task/result/db3e0b22-a249-4ed2-9532-fc6318ccd321
HTTP/1.1 200 OK
Content-Length: 51
Content-Type: application/json; charset=utf-8
Date: Fri, 23 Oct 2015 01:11:44 GMT

{
    "is_result_exist": 1,
    "is_success": 1,
    "message": "46"
}

http POST 127.0.0.1:9595/api/v1/task/rpc method="POST" url="http://127.0.0.1:1323/sum" args='{"a":132,"b":75}'
HTTP/1.1 200 OK
Content-Length: 38
Content-Type: application/json; charset=utf-8
Date: Fri, 30 Oct 2015 02:34:56 GMT

"e6bfb95a-a619-48b7-acb6-396736486c7c"

➜  ~  http GET 127.0.0.1:9595/api/v1/task/result/e6bfb95a-a619-48b7-acb6-396736486c7c
HTTP/1.1 200 OK
Content-Length: 52
Content-Type: application/json; charset=utf-8
Date: Fri, 30 Oct 2015 02:36:05 GMT

{
    "is_result_exist": 1,
    "is_success": 1,
    "message": "207"
}

➜  ~  http GET 127.0.0.1:9595/api/v1/task/result/success/2015-10-30
HTTP/1.1 200 OK
Content-Length: 1
Content-Type: application/json; charset=utf-8
Date: Fri, 30 Oct 2015 03:08:10 GMT

3

➜  ~  http GET 127.0.0.1:9595/api/v1/task/result/failure/2015-10-30
HTTP/1.1 200 OK
Content-Length: 1
Content-Type: application/json; charset=utf-8
Date: Fri, 30 Oct 2015 03:08:34 GMT

2
You can’t perform that action at this time.