Skip to content

Commit

Permalink
expose queues over admin API
Browse files Browse the repository at this point in the history
  • Loading branch information
reddec committed Jul 19, 2020
1 parent a7b1cf6 commit 5f88a1c
Show file tree
Hide file tree
Showing 11 changed files with 1,046 additions and 3 deletions.
48 changes: 48 additions & 0 deletions api/client/queues_api_client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
package client

import (
"context"
client "github.com/reddec/jsonrpc2/client"
api "github.com/reddec/trusted-cgi/api"
application "github.com/reddec/trusted-cgi/application"
"sync/atomic"
)

func DefaultQueuesAPI() *QueuesAPIClient {
return &QueuesAPIClient{BaseURL: "https://127.0.0.1:3434/u/"}
}

type QueuesAPIClient struct {
BaseURL string
sequence uint64
}

// Create queue and link it to lambda and start worker
func (impl *QueuesAPIClient) Create(ctx context.Context, token *api.Token, name string, lambda string) (reply *application.Queue, err error) {
err = client.CallHTTP(ctx, impl.BaseURL, "QueuesAPI.Create", atomic.AddUint64(&impl.sequence, 1), &reply, token, name, lambda)
return
}

// Remove queue and stop worker
func (impl *QueuesAPIClient) Remove(ctx context.Context, token *api.Token, name string) (reply bool, err error) {
err = client.CallHTTP(ctx, impl.BaseURL, "QueuesAPI.Remove", atomic.AddUint64(&impl.sequence, 1), &reply, token, name)
return
}

// Linked queues for lambda
func (impl *QueuesAPIClient) Linked(ctx context.Context, token *api.Token, lambda string) (reply []application.Queue, err error) {
err = client.CallHTTP(ctx, impl.BaseURL, "QueuesAPI.Linked", atomic.AddUint64(&impl.sequence, 1), &reply, token, lambda)
return
}

// List of all queues
func (impl *QueuesAPIClient) List(ctx context.Context, token *api.Token) (reply []application.Queue, err error) {
err = client.CallHTTP(ctx, impl.BaseURL, "QueuesAPI.List", atomic.AddUint64(&impl.sequence, 1), &reply, token)
return
}

// Assign lambda to queue (re-link)
func (impl *QueuesAPIClient) Assign(ctx context.Context, token *api.Token, name string, lambda string) (reply bool, err error) {
err = client.CallHTTP(ctx, impl.BaseURL, "QueuesAPI.Assign", atomic.AddUint64(&impl.sequence, 1), &reply, token, name, lambda)
return
}
121 changes: 121 additions & 0 deletions api/handlers/queues_api.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion api/handlers/user_api.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

14 changes: 14 additions & 0 deletions api/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,3 +110,17 @@ type UserAPI interface {
// Change password for the user
ChangePassword(ctx context.Context, token *Token, password string) (bool, error)
}

// API for managing queues
type QueuesAPI interface {
// Create queue and link it to lambda and start worker
Create(ctx context.Context, token *Token, name string, lambda string) (*application.Queue, error)
// Remove queue and stop worker
Remove(ctx context.Context, token *Token, name string) (bool, error)
// Linked queues for lambda
Linked(ctx context.Context, token *Token, lambda string) ([]application.Queue, error)
// List of all queues
List(ctx context.Context, token *Token) ([]application.Queue, error)
// Assign lambda to queue (re-link)
Assign(ctx context.Context, token *Token, name string, lambda string) (bool, error)
}
41 changes: 41 additions & 0 deletions api/services/queues_srv.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package services

import (
"context"
"github.com/reddec/trusted-cgi/api"
"github.com/reddec/trusted-cgi/application"
)

func NewQueuesSrv(queues application.Queues) *queuesSrv {
return &queuesSrv{queues: queues}
}

type queuesSrv struct {
queues application.Queues
}

func (srv *queuesSrv) Create(ctx context.Context, token *api.Token, name string, lambda string) (*application.Queue, error) {
q := application.Queue{
Name: name,
Target: lambda,
}
return &q, srv.queues.Add(q)
}

func (srv *queuesSrv) Remove(ctx context.Context, token *api.Token, name string) (bool, error) {
err := srv.queues.Remove(name)
return err == nil, err
}

func (srv *queuesSrv) Linked(ctx context.Context, token *api.Token, lambda string) ([]application.Queue, error) {
return srv.queues.Find(lambda), nil
}

func (srv *queuesSrv) List(ctx context.Context, token *api.Token) ([]application.Queue, error) {
return srv.queues.List(), nil
}

func (srv *queuesSrv) Assign(ctx context.Context, token *api.Token, name string, lambda string) (bool, error) {
err := srv.queues.Assign(name, lambda)
return err == nil, err
}
114 changes: 114 additions & 0 deletions clients/js/queues_api.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
export class QueuesAPIError extends Error {
constructor(message, code, details) {
super(code + ': ' + message);
this.code = code;
this.details = details;
}
}

export class QueuesAPI {
/**
API for managing queues
**/

// Create new API handler to QueuesAPI.
// preflightHandler (if defined) can return promise
constructor(base_url = 'https://127.0.0.1:3434/u/', preflightHandler = null) {
this.__url = base_url;
this.__id = 1;
this.__preflightHandler = preflightHandler;
}


/**
Create queue and link it to lambda and start worker
**/
async create(token, name, lambda){
return (await this.__call('Create', {
"jsonrpc" : "2.0",
"method" : "QueuesAPI.Create",
"id" : this.__next_id(),
"params" : [token, name, lambda]
}));
}

/**
Remove queue and stop worker
**/
async remove(token, name){
return (await this.__call('Remove', {
"jsonrpc" : "2.0",
"method" : "QueuesAPI.Remove",
"id" : this.__next_id(),
"params" : [token, name]
}));
}

/**
Linked queues for lambda
**/
async linked(token, lambda){
return (await this.__call('Linked', {
"jsonrpc" : "2.0",
"method" : "QueuesAPI.Linked",
"id" : this.__next_id(),
"params" : [token, lambda]
}));
}

/**
List of all queues
**/
async list(token){
return (await this.__call('List', {
"jsonrpc" : "2.0",
"method" : "QueuesAPI.List",
"id" : this.__next_id(),
"params" : [token]
}));
}

/**
Assign lambda to queue (re-link)
**/
async assign(token, name, lambda){
return (await this.__call('Assign', {
"jsonrpc" : "2.0",
"method" : "QueuesAPI.Assign",
"id" : this.__next_id(),
"params" : [token, name, lambda]
}));
}



__next_id() {
this.__id += 1;
return this.__id
}

async __call(method, req) {
const fetchParams = {
method: "POST",
headers: {
'Content-Type' : 'application/json',
},
body: JSON.stringify(req)
};
if (this.__preflightHandler) {
await Promise.resolve(this.__preflightHandler(method, fetchParams));
}
const res = await fetch(this.__url, fetchParams);
if (!res.ok) {
throw new Error(res.status + ' ' + res.statusText);
}

const data = await res.json();

if ('error' in data) {
throw new QueuesAPIError(data.error.message, data.error.code, data.error.data);
}

return data.result;
}
}
Loading

0 comments on commit 5f88a1c

Please sign in to comment.