Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 35 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,41 @@ URL地址 `/delete`
|:-------:|:-----------:|:------------:|:-----------------:|
| id | string | Job唯一标识 | |


### 查询任务
URL地址 `/get`

```json
{
"id": "15702398321"
}
```

| 参数名 | 类型 | 含义 | 备注 |
|:-------:|:-----------:|:------------:|:-----------------:|
| id | string | Job唯一标识 | |



队列中有任务返回值
```json
{
"code": 0,
"message": "操作成功",
"data": {
"id": "15702398321",
"body": "{\"uid\": 10829378,\"created\": 1498657365 }"
}
}
```
队列为空返回值
```json
{
"code": 0,
"message": "操作成功",
"data": null
}
```

### 完成任务
URL地址 `/finish`
Expand Down
8 changes: 5 additions & 3 deletions cmd/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,14 @@ package cmd
import (
"flag"
"fmt"
"github.com/ouqiang/delay-queue/config"
"github.com/ouqiang/delay-queue/delayqueue"
"github.com/ouqiang/delay-queue/routers"
"log"
"net/http"
_ "net/http/pprof"
"os"

"github.com/ouqiang/delay-queue/config"
"github.com/ouqiang/delay-queue/delayqueue"
"github.com/ouqiang/delay-queue/routers"
)

type Cmd struct{}
Expand Down Expand Up @@ -53,6 +54,7 @@ func (cmd *Cmd) runWeb() {
http.HandleFunc("/pop", routers.Pop)
http.HandleFunc("/finish", routers.Delete)
http.HandleFunc("/delete", routers.Delete)
http.HandleFunc("/get", routers.Get)

log.Printf("listen %s\n", config.Setting.BindAddress)
err := http.ListenAndServe(config.Setting.BindAddress, nil)
Expand Down
3 changes: 2 additions & 1 deletion config/config.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
package config

import (
"gopkg.in/ini.v1"
"log"

"gopkg.in/ini.v1"
)

// 解析配置文件
Expand Down
17 changes: 16 additions & 1 deletion delayqueue/delay_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,10 @@ package delayqueue
import (
"errors"
"fmt"
"github.com/ouqiang/delay-queue/config"
"log"
"time"

"github.com/ouqiang/delay-queue/config"
)

var (
Expand Down Expand Up @@ -75,6 +76,20 @@ func Remove(jobId string) error {
return removeJob(jobId)
}

// 查询Job
func Get(jobId string) (*Job, error) {
job, err := getJob(jobId)
if err != nil {
return job, err
}

// 消息不存在, 可能已被删除
if job == nil {
return nil, nil
}
return job, err
}

// 轮询获取Job名称, 使job分布到不同bucket中, 提高扫描速度
func generateBucketName() <-chan string {
c := make(chan string)
Expand Down
1 change: 1 addition & 0 deletions delayqueue/ready_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package delayqueue

import (
"fmt"

"github.com/ouqiang/delay-queue/config"
)

Expand Down
5 changes: 3 additions & 2 deletions delayqueue/redis.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
package delayqueue

import (
"github.com/garyburd/redigo/redis"
"github.com/ouqiang/delay-queue/config"
"log"
"time"

"github.com/garyburd/redigo/redis"
"github.com/ouqiang/delay-queue/config"
)

var (
Expand Down
42 changes: 41 additions & 1 deletion routers/routers.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,13 @@ package routers

import (
"encoding/json"
"github.com/ouqiang/delay-queue/delayqueue"
"io/ioutil"
"log"
"net/http"
"strings"
"time"

"github.com/ouqiang/delay-queue/delayqueue"
)

type PopRequest struct {
Expand Down Expand Up @@ -123,6 +124,45 @@ func Delete(resp http.ResponseWriter, req *http.Request) {
resp.Write(generateSuccessBody("操作成功", nil))
}

// 查询job
func Get(resp http.ResponseWriter, req *http.Request) {
var deleteRequest DeleteRequest
err := readBody(resp, req, &deleteRequest)
if err != nil {
return
}
id := strings.TrimSpace(deleteRequest.Id)
if id == "" {
resp.Write(generateFailureBody("job id不能为空"))
return
}
job, err := delayqueue.Get(id)
if err != nil {
log.Printf("查询job失败#%s", err.Error())
resp.Write(generateFailureBody("查询Job失败"))
return
}

if job == nil {
resp.Write(generateSuccessBody("操作成功", nil))
return
}

type Data struct {
Id string `json:"id"`
Body string `json:"body"`
}

data := Data{
Id: job.Id,
Body: job.Body,
}

log.Printf("get job#%+v", data)

resp.Write(generateSuccessBody("操作成功", data))
}

type ResponseBody struct {
Code int `json:"code"`
Message string `json:"message"`
Expand Down