Skip to content

Commit

Permalink
add delete and list methods for queue api
Browse files Browse the repository at this point in the history
  • Loading branch information
mylxsw committed Oct 28, 2019
1 parent 7747762 commit 86c5bb8
Show file tree
Hide file tree
Showing 7 changed files with 101 additions and 25 deletions.
65 changes: 65 additions & 0 deletions api/controller/queue.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,16 @@
package controller

import (
"errors"
"fmt"
"net/http"

"github.com/mylxsw/adanos-alert/internal/queue"
"github.com/mylxsw/adanos-alert/internal/repository"
"github.com/mylxsw/container"
"github.com/mylxsw/hades"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/bson/primitive"
)

type QueueController struct {
Expand All @@ -19,6 +24,9 @@ func NewQueueController(cc *container.Container) hades.Controller {
func (q *QueueController) Register(router *hades.Router) {
router.Group("/queue/", func(router *hades.Router) {
router.Post("/control/", q.Control).Name("queue:control")
router.Get("/jobs/", q.Jobs).Name("queue:jobs:all")
router.Delete("/jobs/{id}/", q.Delete).Name("queue:jobs:delete")
router.Get("/jobs/{id}/", q.Job).Name("queue:jobs:one")
})
}

Expand All @@ -41,3 +49,60 @@ func (q *QueueController) Control(ctx hades.Context, manager queue.Manager) hade
"info": manager.Info(),
})
}

type QueueJobsResp struct {
Jobs []repository.QueueJob `json:"jobs"`
Next int64 `json:"next"`
}

func (q *QueueController) Jobs(ctx hades.Context, repo repository.QueueRepo) (*QueueJobsResp, error) {
offset := ctx.Int64Input("offset", 0)
limit := ctx.Int64Input("limit", 10)

filter := bson.M{}

status := ctx.Input("status")
if status != "" {
filter["status"] = status
}

jobs, next, err := repo.Paginate(filter, offset, limit)
if err != nil {
return nil, hades.WrapJSONError(err, http.StatusInternalServerError)
}

return &QueueJobsResp{
Jobs: jobs,
Next: next,
}, nil
}

func (q *QueueController) Job(ctx hades.Context, repo repository.QueueRepo) (*repository.QueueJob, error) {
jobID, err := primitive.ObjectIDFromHex(ctx.PathVar("id"))
if err != nil {
return nil, hades.WrapJSONError(fmt.Errorf("invalid request: %v", err), http.StatusUnprocessableEntity)
}

job, err := repo.Get(jobID)
if err != nil {
if err == repository.ErrNotFound {
return nil, hades.WrapJSONError(errors.New("no such job"), http.StatusNotFound)
}

return nil, hades.WrapJSONError(err, http.StatusInternalServerError)
}

return &job, nil
}

func (q *QueueController) Delete(ctx hades.Context, repo repository.QueueRepo) error {
jobID, err := primitive.ObjectIDFromHex(ctx.PathVar("id"))
if err != nil {
return hades.WrapJSONError(fmt.Errorf("invalid request: %v", err), http.StatusUnprocessableEntity)
}

return repo.Delete(bson.M{
"_id": jobID,
"status": bson.M{"$ne": repository.QueueItemStatusRunning},
})
}
2 changes: 1 addition & 1 deletion internal/action/action.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ func (q *QueueAction) Handle(rule repository.Rule, trigger repository.Trigger, g
Rule: rule,
}

id, err := queueManager.Enqueue(repository.QueueItem{
id, err := queueManager.Enqueue(repository.QueueJob{
Name: "action",
Payload: string(payload.Encode()),
})
Expand Down
2 changes: 1 addition & 1 deletion internal/action/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ func (s ServiceProvider) Boot(app *glacier.Glacier) {
manager.Register("email", NewEmailAction(manager))
manager.Register("wechat", NewWechatAction(manager))

queueManager.RegisterHandler("action", func(item repository.QueueItem) error {
queueManager.RegisterHandler("action", func(item repository.QueueJob) error {
var payload Payload
if err := payload.Decode([]byte(item.Payload)); err != nil {
log.WithFields(log.Fields{
Expand Down
10 changes: 5 additions & 5 deletions internal/queue/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,15 @@ import (
)

type Manager interface {
Enqueue(item repository.QueueItem) (string, error)
Enqueue(item repository.QueueJob) (string, error)
StartWorker(ctx context.Context, workerID string)
Pause(pause bool)
Paused() bool
Info() Info
RegisterHandler(name string, handler Handler)
}

type Handler func(item repository.QueueItem) error
type Handler func(item repository.QueueJob) error

type Info struct {
StartAt time.Time `json:"start_at"`
Expand Down Expand Up @@ -87,7 +87,7 @@ func (manager *queueManager) Pause(pause bool) {
}

// Enqueue add an item to queue
func (manager *queueManager) Enqueue(item repository.QueueItem) (string, error) {
func (manager *queueManager) Enqueue(item repository.QueueJob) (string, error) {
manager.lock.RLock()
defer manager.lock.RUnlock()

Expand Down Expand Up @@ -157,7 +157,7 @@ func (manager *queueManager) run(ctx context.Context) {
}
}

func (manager *queueManager) handle(ctx context.Context, item repository.QueueItem) {
func (manager *queueManager) handle(ctx context.Context, item repository.QueueJob) {
manager.lock.RLock()
handler, ok := manager.handlers[item.Name]
manager.lock.RUnlock()
Expand Down Expand Up @@ -216,7 +216,7 @@ func (manager *queueManager) handle(ctx context.Context, item repository.QueueIt
}

func eliminatePanic(cb Handler) Handler {
return func(item repository.QueueItem) (err error) {
return func(item repository.QueueJob) (err error) {
defer func() {
if err2 := recover(); err2 != nil {
err = fmt.Errorf("handler panic with: %v", err2)
Expand Down
31 changes: 21 additions & 10 deletions internal/repository/impl/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ func NewQueueRepo(db *mongo.Database) repository.QueueRepo {
return &QueueRepo{col: db.Collection("queue")}
}

func (q *QueueRepo) Enqueue(item repository.QueueItem) (id primitive.ObjectID, err error) {
func (q *QueueRepo) Enqueue(item repository.QueueJob) (id primitive.ObjectID, err error) {
if item.ID.IsZero() {
item.CreatedAt = time.Now()
item.UpdatedAt = item.CreatedAt
Expand Down Expand Up @@ -51,7 +51,7 @@ func (q *QueueRepo) Enqueue(item repository.QueueItem) (id primitive.ObjectID, e
return item.ID, nil
}

func (q *QueueRepo) Dequeue() (item repository.QueueItem, err error) {
func (q *QueueRepo) Dequeue() (item repository.QueueJob, err error) {
rs := q.col.FindOneAndUpdate(
context.TODO(),
bson.M{"status": repository.QueueItemStatusWait, "next_execute_at": bson.M{"$lt": time.Now()}},
Expand All @@ -70,14 +70,21 @@ func (q *QueueRepo) Dequeue() (item repository.QueueItem, err error) {
return
}

func (q *QueueRepo) Paginate(filter bson.M, offset, limit int64) (items []repository.QueueItem, next int64, err error) {
cur, err := q.col.Find(context.TODO(), filter, options.Find().SetSkip(offset).SetLimit(limit))
func (q *QueueRepo) Paginate(filter bson.M, offset, limit int64) (items []repository.QueueJob, next int64, err error) {
cur, err := q.col.Find(
context.TODO(),
filter,
options.Find().
SetSkip(offset).
SetLimit(limit).
SetSort(bson.M{"next_execute_at": 1}),
)
if err != nil {
return
}

for cur.Next(context.TODO()) {
var item repository.QueueItem
var item repository.QueueJob
if err = cur.Decode(&item); err != nil {
return
}
Expand All @@ -101,9 +108,9 @@ func (q *QueueRepo) DeleteID(id primitive.ObjectID) error {
return q.Delete(bson.M{"_id": id})
}

func (q *QueueRepo) Get(id primitive.ObjectID) (repository.QueueItem, error) {
func (q *QueueRepo) Get(id primitive.ObjectID) (repository.QueueJob, error) {
rs := q.col.FindOne(context.TODO(), bson.M{"_id": id})
var item repository.QueueItem
var item repository.QueueJob
if err := rs.Decode(&item); err != nil {
if err == mongo.ErrNoDocuments {
return item, repository.ErrNotFound
Expand All @@ -118,12 +125,16 @@ func (q *QueueRepo) Count(filter bson.M) (int64, error) {
return q.col.CountDocuments(context.TODO(), filter)
}

func (q *QueueRepo) Update(filter bson.M, item repository.QueueItem) error {
func (q *QueueRepo) Update(filter bson.M, item repository.QueueJob) error {
item.UpdatedAt = time.Now()
if item.NextExecuteAt.Before(item.UpdatedAt) {
item.NextExecuteAt = item.UpdatedAt
}

_, err := q.col.ReplaceOne(context.TODO(), filter, item)
return err
}

func (q *QueueRepo) UpdateID(id primitive.ObjectID, item repository.QueueItem) error {
return q.Update(bson.M{"_id": id}, item)
func (q *QueueRepo) UpdateID(id primitive.ObjectID, jobItem repository.QueueJob) error {
return q.Update(bson.M{"_id": id}, jobItem)
}
2 changes: 1 addition & 1 deletion internal/repository/impl/queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ func (q *QueueTestSuit) TestEnqueueDequeue() {
q.Equal(repository.ErrNotFound, err)

// test enqueue
item := repository.QueueItem{
item := repository.QueueJob{
Name: "action",
Payload: "{}",
}
Expand Down
14 changes: 7 additions & 7 deletions internal/repository/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ const (
QueueItemStatusCanceled QueueItemStatus = "canceled"
)

type QueueItem struct {
type QueueJob struct {
ID primitive.ObjectID `bson:"_id" json:"id"`
Name string `bson:"name" json:"name"`
Payload string `bson:"payload" json:"payload"`
Expand All @@ -33,13 +33,13 @@ type QueueRepo interface {
// Enqueue add a item to queue
// if the item is new(id is empty), add it to queue
// if the item is already existed, replace it
Enqueue(item QueueItem) (primitive.ObjectID, error)
Dequeue() (QueueItem, error)
UpdateID(id primitive.ObjectID, item QueueItem) error
Update(filter bson.M, item QueueItem) error
Paginate(filter bson.M, offset, limit int64) (items []QueueItem, next int64, err error)
Enqueue(item QueueJob) (primitive.ObjectID, error)
Dequeue() (QueueJob, error)
UpdateID(id primitive.ObjectID, jobItem QueueJob) error
Update(filter bson.M, item QueueJob) error
Paginate(filter bson.M, offset, limit int64) (items []QueueJob, next int64, err error)
Delete(filter bson.M) error
DeleteID(id primitive.ObjectID) error
Get(id primitive.ObjectID) (QueueItem, error)
Get(id primitive.ObjectID) (QueueJob, error)
Count(filter bson.M) (int64, error)
}

0 comments on commit 86c5bb8

Please sign in to comment.