Skip to content

Commit

Permalink
add requeue for queues
Browse files Browse the repository at this point in the history
  • Loading branch information
reddec committed Jul 20, 2020
1 parent 1bbdfbf commit 79c8722
Show file tree
Hide file tree
Showing 12 changed files with 80 additions and 40 deletions.
4 changes: 2 additions & 2 deletions api/client/queues_api_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@ type QueuesAPIClient struct {
}

// Create queue and link it to lambda and start worker
func (impl *QueuesAPIClient) Create(ctx context.Context, token *api.Token, name string, lambda string) (reply *application.Queue, err error) {
err = client.CallHTTP(ctx, impl.BaseURL, "QueuesAPI.Create", atomic.AddUint64(&impl.sequence, 1), &reply, token, name, lambda)
func (impl *QueuesAPIClient) Create(ctx context.Context, token *api.Token, queue application.Queue) (reply *application.Queue, err error) {
err = client.CallHTTP(ctx, impl.BaseURL, "QueuesAPI.Create", atomic.AddUint64(&impl.sequence, 1), &reply, token, queue)
return
}

Expand Down
10 changes: 5 additions & 5 deletions api/handlers/queues_api.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion api/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ type UserAPI interface {
// API for managing queues
type QueuesAPI interface {
// Create queue and link it to lambda and start worker
Create(ctx context.Context, token *Token, name string, lambda string) (*application.Queue, error)
Create(ctx context.Context, token *Token, queue application.Queue) (*application.Queue, error)
// Remove queue and stop worker
Remove(ctx context.Context, token *Token, name string) (bool, error)
// Linked queues for lambda
Expand Down
8 changes: 2 additions & 6 deletions api/services/queues_srv.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,8 @@ type queuesSrv struct {
queues application.Queues
}

func (srv *queuesSrv) Create(ctx context.Context, token *api.Token, name string, lambda string) (*application.Queue, error) {
q := application.Queue{
Name: name,
Target: lambda,
}
return &q, srv.queues.Add(q)
func (srv *queuesSrv) Create(ctx context.Context, token *api.Token, queue application.Queue) (*application.Queue, error) {
return &queue, srv.queues.Add(queue)
}

func (srv *queuesSrv) Remove(ctx context.Context, token *api.Token, name string) (bool, error) {
Expand Down
45 changes: 34 additions & 11 deletions application/queuemanager/impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ func (qm *queueManager) addQueueUnsafe(queue application.Queue) error {

q = &queueDefinition{
Queue: queue,
worker: startWorker(qm.ctx, queue.Name, back, queue.Target, qm.platform, &qm.wg),
worker: startWorker(qm.ctx, back, queue, qm.platform, &qm.wg),
queue: back,
}
if qm.queues == nil {
Expand Down Expand Up @@ -138,7 +138,7 @@ func (qm *queueManager) Assign(queue string, targetLambda string) error {
q.worker.stop()
<-q.worker.done
q.Target = targetLambda
q.worker = startWorker(qm.ctx, queue, q.queue, targetLambda, qm.platform, &qm.wg)
q.worker = startWorker(qm.ctx, q.queue, q.Queue, qm.platform, &qm.wg)
return qm.config.SetQueues(qm.listUnsafe())
}

Expand Down Expand Up @@ -185,7 +185,7 @@ type worker struct {
done chan struct{}
}

func startWorker(gctx context.Context, name string, queue queue.Queue, uid string, plt Platform, wg *sync.WaitGroup) *worker {
func startWorker(gctx context.Context, queue queue.Queue, definition application.Queue, plt Platform, wg *sync.WaitGroup) *worker {
ctx, cancel := context.WithCancel(gctx)
w := &worker{
stop: cancel,
Expand All @@ -196,19 +196,15 @@ func startWorker(gctx context.Context, name string, queue queue.Queue, uid strin
defer wg.Done()
defer close(w.done)
for {
req, err := queue.Peek(ctx)

err := doTask(ctx, plt, definition, queue)
if err != nil {
log.Println("queues: queue", definition.Name, "failed process task:", err)
}
select {
case <-ctx.Done():
return
default:
}

if err != nil {
log.Println("queues: failed peek", name, ":", err)
} else if err = plt.InvokeByUID(ctx, uid, *req, os.Stderr); err != nil {
log.Println("queues: failed invoke by uid", uid, "from queue", name, ":", err)
}
err = queue.Commit(ctx)
if err != nil {
log.Println("queues: failed commit - waiting", commitFailedDelay)
Expand All @@ -224,4 +220,31 @@ func startWorker(gctx context.Context, name string, queue queue.Queue, uid strin
return w
}

func doTask(ctx context.Context, plt Platform, definition application.Queue, queue queue.Queue) error {
for i := 0; i <= definition.Retry; i++ {
req, err := queue.Peek(ctx)

select {
case <-ctx.Done():
return ctx.Err()
default:
}

if err != nil {
log.Println("queues: failed peek", definition.Name, ":", err)
} else if err = plt.InvokeByUID(ctx, definition.Target, *req, os.Stderr); err != nil {
log.Println("queues: failed invoke by uid", definition.Target, "from queue", definition.Name, ":", err)
} else {
return nil
}

select {
case <-ctx.Done():
return ctx.Err()
case <-time.After(time.Duration(definition.Interval)):
}
}
return fmt.Errorf("failed to process task for queue %s after all attempts", definition.Name)
}

const commitFailedDelay = 3 * time.Second
6 changes: 4 additions & 2 deletions application/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ func (cfg *Config) ReadFile(file string) error {
}

type Queue struct {
Name string `json:"name"`
Target string `json:"target"`
Name string `json:"name"`
Target string `json:"target"`
Retry int `json:"retry"` // number of additional attempts
Interval types.JsonDuration `json:"interval"` // delay between attempts
}
4 changes: 2 additions & 2 deletions clients/js/queues_api.js
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,12 @@ export class QueuesAPI {
/**
Create queue and link it to lambda and start worker
**/
async create(token, name, lambda){
async create(token, queue){
return (await this.__call('Create', {
"jsonrpc" : "2.0",
"method" : "QueuesAPI.Create",
"id" : this.__next_id(),
"params" : [token, name, lambda]
"params" : [token, queue]
}));
}

Expand Down
14 changes: 10 additions & 4 deletions clients/python/queues_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,18 +10,24 @@
class Queue:
name: 'str'
target: 'str'
retry: 'int'
interval: 'Any'

def to_json(self) -> dict:
return {
"name": self.name,
"target": self.target,
"retry": self.retry,
"interval": self.interval,
}

@staticmethod
def from_json(payload: dict) -> 'Queue':
return Queue(
name=payload['name'],
target=payload['target'],
retry=payload['retry'],
interval=payload['interval'],
)


Expand Down Expand Up @@ -56,15 +62,15 @@ def __next_id(self):
self.__id += 1
return self.__id

async def create(self, token: Any, name: str, lambda: str) -> Queue:
async def create(self, token: Any, queue: Queue) -> Queue:
"""
Create queue and link it to lambda and start worker
"""
response = await self._invoke({
"jsonrpc": "2.0",
"method": "QueuesAPI.Create",
"id": self.__next_id(),
"params": [token, name, lambda, ]
"params": [token, queue.to_json(), ]
})
assert response.status // 100 == 2, str(response.status) + " " + str(response.reason)
payload = await response.json()
Expand Down Expand Up @@ -156,11 +162,11 @@ def __next_id(self):
self.__id += 1
return self.__id

def create(self, token: Any, name: str, lambda: str):
def create(self, token: Any, queue: Queue):
"""
Create queue and link it to lambda and start worker
"""
params = [token, name, lambda, ]
params = [token, queue.to_json(), ]
method = "QueuesAPI.Create"
self.__add_request(method, params, lambda payload: Queue.from_json(payload))

Expand Down
8 changes: 6 additions & 2 deletions clients/ts/queues_api.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,12 @@ export class QueuesAPIError extends Error {
export interface Queue {
name: string
target: string
retry: number
interval: JsonDuration
}

export type JsonDuration = string; // suffixes: ns, us, ms, s, m, h

export type Token = string;


Expand Down Expand Up @@ -183,12 +187,12 @@ export class QueuesAPI {
/**
Create queue and link it to lambda and start worker
**/
async create(token: Token, name: string, lambda: string): Promise<Queue> {
async create(token: Token, queue: Queue): Promise<Queue> {
return (await this.__call({
"jsonrpc" : "2.0",
"method" : "QueuesAPI.Create",
"id" : this.__next_id(),
"params" : [token, name, lambda]
"params" : [token, queue]
})) as Queue;
}

Expand Down
9 changes: 7 additions & 2 deletions docs/api/queues_api.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,7 @@ Create queue and link it to lambda and start worker
| Position | Name | Type |
|----------|------|------|
| 0 | token | `*Token` |
| 1 | name | `string` |
| 2 | lambda | `string` |
| 1 | queue | `Queue` |

```bash
curl -H 'Content-Type: application/json' --data-binary @- "https://127.0.0.1:3434/u/" <<EOF
Expand All @@ -50,6 +49,8 @@ EOF
|------|------|---------|
| name | `string` | |
| target | `string` | |
| retry | `int` | |
| interval | `types.JsonDuration` | |

### Token

Expand Down Expand Up @@ -118,6 +119,8 @@ EOF
|------|------|---------|
| name | `string` | |
| target | `string` | |
| retry | `int` | |
| interval | `types.JsonDuration` | |

### Token

Expand Down Expand Up @@ -155,6 +158,8 @@ EOF
|------|------|---------|
| name | `string` | |
| target | `string` | |
| retry | `int` | |
| interval | `types.JsonDuration` | |

### Token

Expand Down
8 changes: 6 additions & 2 deletions docs/usage/queues.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,19 @@ Queues that bound to the lambda could be found in Overview -> Endpoint page.

Queue can be re-assigned to another lambda without destroy.

In case of failure, task will be re-tried after defined interval with limited number of attempts. 0 retry means
no **additional attempts** - at least once the task will be processed. After failure, a queue worker will wait required
time, and it will not process other tasks.

After lambda removal, linked queues are also will be **automatically removed**.

Designed for

* to provide async processing for long-running tasks;

NOT designed for

* load balancing (but possible using multiple queues);
* failure-tolerance: failed request will not be re-queued;


Endpoint: `/q/:queue-name`

Expand Down

0 comments on commit 79c8722

Please sign in to comment.