Skip to content

Commit

Permalink
add queue manager to a main flow
Browse files Browse the repository at this point in the history
  • Loading branch information
reddec committed Jul 16, 2020
1 parent 98e4e43 commit add5c3a
Show file tree
Hide file tree
Showing 12 changed files with 300 additions and 6 deletions.
28 changes: 26 additions & 2 deletions application/adapters.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,18 @@ func HandlerByLinks(globalCtx context.Context, tracker stats.Recorder, platform
return handler(globalCtx, platform, tracker, platform.FindByLink)
}

// Expose lambda handlers by UID (/a/) and by links (/l/)
func Handler(globalCtx context.Context, tracker stats.Recorder, platform Platform) *http.ServeMux {
// Expose queues over HTTP handler. First part of path will be used as queue name
func HandlerByQueues(queues Queues) http.HandlerFunc {
return handlerQueue(queues)
}

// Expose lambda handlers by UID (/a/) and by links (/l/) and for queues (/q/)
func Handler(globalCtx context.Context, tracker stats.Recorder, platform Platform, queues Queues) *http.ServeMux {
mux := http.NewServeMux()
mux.Handle("/a/", http.StripPrefix("/a/", HandlerByUID(globalCtx, tracker, platform)))
mux.Handle("/l/", http.StripPrefix("/l/", HandlerByLinks(globalCtx, tracker, platform)))
mux.Handle("/q/", http.StripPrefix("/q/", HandlerByQueues(queues)))
//TODO: queue balancer
return mux
}

Expand Down Expand Up @@ -63,3 +70,20 @@ func handler(globalCtx context.Context, platform Platform, tracker stats.Recorde
tracker.Track(record)
}
}

func handlerQueue(queues Queues) http.HandlerFunc {
return func(writer http.ResponseWriter, request *http.Request) {
defer request.Body.Close()
sections := strings.SplitN(strings.Trim(request.URL.Path, "/"), "/", 2)
uid := sections[0]
req := types.FromHTTP(request)

err := queues.Put(uid, req)

if err != nil {
http.Error(writer, err.Error(), http.StatusNotFound)
return
}
writer.WriteHeader(http.StatusNoContent)
}
}
16 changes: 15 additions & 1 deletion application/cases/cases.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,13 @@ import (
"github.com/reddec/trusted-cgi/templates"
"github.com/reddec/trusted-cgi/types"
"io/ioutil"
"log"
"os"
"path/filepath"
"time"
)

func New(platform application.Platform, dir, templateDir string) (*casesImpl, error) {
func New(platform application.Platform, queues application.Queues, dir, templateDir string) (*casesImpl, error) {
aTemplateDir, err := filepath.Abs(templateDir)
if err != nil {
return nil, fmt.Errorf("resolve template dir: %w", err)
Expand All @@ -27,6 +28,7 @@ func New(platform application.Platform, dir, templateDir string) (*casesImpl, er
directory: aDir,
templatesDir: aTemplateDir,
platform: platform,
queues: queues,
}
return cs, cs.Scan()
}
Expand All @@ -37,6 +39,7 @@ type casesImpl struct {
directory string
templatesDir string
platform application.Platform
queues application.Queues
}

func (impl *casesImpl) Scan() error {
Expand Down Expand Up @@ -142,5 +145,16 @@ func (impl *casesImpl) Remove(uid string) error {
return fmt.Errorf("remove - find by uid %s: %w", uid, err)
}
impl.platform.Remove(uid)
// unlink queues
for _, q := range impl.queues.Find(uid) {
err := impl.queues.Remove(q.Name)
if err != nil {
log.Println("[ERROR]", "failed remove queue", q.Name)
}
}
return fn.Lambda.Remove()
}

func (impl *casesImpl) Queues() application.Queues {
return impl.queues
}
22 changes: 22 additions & 0 deletions application/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"github.com/reddec/trusted-cgi/templates"
"github.com/reddec/trusted-cgi/types"
"io"
"regexp"
"time"
)

Expand Down Expand Up @@ -107,10 +108,31 @@ type Cases interface {
Remove(uid string) error
// Get underlying platform
Platform() Platform
// Get underlying queues manager
Queues() Queues
// Run scheduled actions from all lambda. Saves last run
RunScheduledActions(ctx context.Context)
// List of all templates without availability check
Templates() (map[string]*templates.Template, error)
// Content of SSH public key if set
PublicSSHKey() ([]byte, error)
}

// Queue name limitations
var QueueNameReg = regexp.MustCompile(`^[a-z0-9A-Z-]{3,64}$`)

// Queues manager. Manages queues and linked worker
type Queues interface {
// Put request to queue. If queue not exists, an error will be thrown
Put(queue string, request *types.Request) error
// Add new queue. See QueueNameReg for limitations
Add(queue Queue) error
// Remove queue and worker
Remove(queue string) error
// Assign queue to another lambda. If lambda is empty, queue will be stopped (but Put will still work)
Assign(queue string, targetLambda string) error
// List of all defined queue.
List() []Queue
// Find queues linked to lambda
Find(targetLambda string) []Queue
}
195 changes: 195 additions & 0 deletions application/queues/impl.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,195 @@
package queues

import (
"context"
"fmt"
"github.com/reddec/trusted-cgi/application"
"github.com/reddec/trusted-cgi/queue"
"github.com/reddec/trusted-cgi/types"
"io"
"log"
"os"
"sort"
"sync"
"time"
)

type platform interface {
FindByUID(uid string) (*application.Definition, error)
Invoke(ctx context.Context, lambda application.Lambda, request types.Request, out io.Writer) error
}

type QueueFactory func(name string) (queue.Queue, error)

func New(ctx context.Context, platform platform, factory QueueFactory) *queueManager {
return &queueManager{
ctx: ctx,
platform: platform,
queues: map[string]*queueDefinition{},
queueFactory: factory,
}
}

type queueManager struct {
ctx context.Context
lock sync.RWMutex
platform platform
queues map[string]*queueDefinition
queueFactory QueueFactory
wg sync.WaitGroup
}

func (qm *queueManager) Put(queue string, request *types.Request) error {
qm.lock.RLock()
defer qm.lock.RUnlock()
defer request.Body.Close()
q, ok := qm.queues[queue]
if !ok {
return fmt.Errorf("queue %s does not exist", queue)
}
return q.queue.Put(qm.ctx, request)
}

func (qm *queueManager) Add(queue application.Queue) error {
if !application.QueueNameReg.MatchString(queue.Name) {
return fmt.Errorf("invalid queue name: should be %v", application.QueueNameReg)
}
qm.lock.Lock()
defer qm.lock.Unlock()
q, ok := qm.queues[queue.Name]
if ok {
return fmt.Errorf("queue %s already exists", queue.Name)
}

lambda, err := qm.platform.FindByUID(queue.Target)
if err != nil {
return err
}

back, err := qm.queueFactory(queue.Name)
if err != nil {
return err
}

q = &queueDefinition{
Queue: application.Queue{},
worker: startWorker(qm.ctx, back, lambda.Lambda, qm.platform, &qm.wg),
queue: back,
}
if qm.queues == nil {
qm.queues = make(map[string]*queueDefinition)
}
qm.queues[queue.Name] = q
return nil
}

func (qm *queueManager) Remove(queue string) error {
qm.lock.Lock()
defer qm.lock.Unlock()
q, ok := qm.queues[queue]
if !ok {
return nil
}
q.worker.stop()
<-q.worker.done
delete(qm.queues, queue)
return q.queue.Destroy()
}

func (qm *queueManager) Assign(queue string, targetLambda string) error {
lambda, err := qm.platform.FindByUID(targetLambda)
if err != nil {
return err
}
qm.lock.Lock()
defer qm.lock.Unlock()
q, ok := qm.queues[queue]
if !ok {
return fmt.Errorf("queue %s does not exist", queue)
}
q.worker.stop()
<-q.worker.done
q.Target = targetLambda
q.worker = startWorker(qm.ctx, q.queue, lambda.Lambda, qm.platform, &qm.wg)
return nil
}

func (qm *queueManager) List() []application.Queue {
var ans = make([]application.Queue, 0, len(qm.queues))
qm.lock.RLock()
for _, q := range qm.queues {
ans = append(ans, q.Queue)
}
qm.lock.RUnlock()
sort.Slice(ans, func(i, j int) bool {
return ans[i].Name < ans[j].Name
})
return ans
}

func (qm *queueManager) Find(targetLambda string) []application.Queue {
var ans = make([]application.Queue, 0)
qm.lock.RLock()
defer qm.lock.RUnlock()
for _, q := range qm.queues {
if q.Target == targetLambda {
ans = append(ans, q.Queue)
}
}
return ans
}

func (qm *queueManager) Wait() {
qm.wg.Wait()
}

type queueDefinition struct {
application.Queue
worker *worker
queue queue.Queue
}

type worker struct {
stop func()
done chan struct{}
}

func startWorker(gctx context.Context, queue queue.Queue, lambda application.Lambda, plt platform, wg *sync.WaitGroup) *worker {
ctx, cancel := context.WithCancel(gctx)
w := &worker{
stop: cancel,
done: make(chan struct{}),
}
go func() {
defer wg.Done()
defer close(w.done)
for {
req, err := queue.Peek(ctx)

select {
case <-ctx.Done():
return
default:
}

if err != nil {
log.Println("queues: failed peek")
} else if err = plt.Invoke(ctx, lambda, *req, os.Stderr); err != nil {
log.Println("queues: failed invoke:", err)
}
err = queue.Commit(ctx)
if err != nil {
log.Println("queues: failed commit - waiting", commitFailedDelay)
select {
case <-time.After(commitFailedDelay):
case <-ctx.Done():
return
}
}
}
}()

return w
}

const commitFailedDelay = 3 * time.Second
5 changes: 5 additions & 0 deletions application/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,3 +48,8 @@ func (cfg *Config) ReadFile(file string) error {
defer f.Close()
return json.NewDecoder(f).Decode(cfg)
}

type Queue struct {
Name string `json:"name"`
Target string `json:"target"`
}
22 changes: 20 additions & 2 deletions cmd/trusted-cgi/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,12 @@ import (
"github.com/reddec/trusted-cgi/application"
"github.com/reddec/trusted-cgi/application/cases"
"github.com/reddec/trusted-cgi/application/platform"
"github.com/reddec/trusted-cgi/application/queues"
"github.com/reddec/trusted-cgi/cmd/internal"
internal2 "github.com/reddec/trusted-cgi/internal"
"github.com/reddec/trusted-cgi/queue"
"github.com/reddec/trusted-cgi/queue/indir"
"github.com/reddec/trusted-cgi/queue/inmemory"
"github.com/reddec/trusted-cgi/server"
"github.com/reddec/trusted-cgi/stats/impl/memlog"
"log"
Expand All @@ -25,6 +29,7 @@ type Config struct {
Config string `short:"c" long:"config" env:"CONFIG" description:"Location of server configuration" default:"server.json"`
Dir string `short:"d" long:"dir" env:"DIR" description:"Project directory" default:"."`
Templates string `long:"templates" env:"TEMPLATES" description:"Templates directory" default:".templates"`
Queues string `long:"queues" env:"QUEUES" description:"Queues directory (- means memory)" default:".queues"`
//
InitialAdminPassword string `long:"initial-admin-password" env:"INITIAL_ADMIN_PASSWORD" description:"Initial admin password" default:"admin"`
InitialChrootUser string `long:"initial-chroot-user" env:"INITIAL_CHROOT_USER" description:"Initial user for service" default:""`
Expand Down Expand Up @@ -103,7 +108,9 @@ func run(ctx context.Context, config Config) error {
return err
}

useCases, err := cases.New(basePlatform, config.Dir, config.Templates)
queueManager := queues.New(ctx, basePlatform, config.getQueueFactory())

useCases, err := cases.New(basePlatform, queueManager, config.Dir, config.Templates)
if err != nil {
return err
}
Expand All @@ -127,7 +134,7 @@ func run(ctx context.Context, config Config) error {
defer tracker.Dump()
go dumpTracker(ctx, config.StatsInterval, tracker)

handler, err := server.Handler(ctx, config.Dev, basePlatform, tracker, userApi, projectApi, lambdaApi, userApi)
handler, err := server.Handler(ctx, config.Dev, basePlatform, queueManager, tracker, userApi, projectApi, lambdaApi, userApi)
if err != nil {
return err
}
Expand Down Expand Up @@ -165,3 +172,14 @@ func runScheduler(ctx context.Context, each time.Duration, runner application.Ca
runner.RunScheduledActions(ctx)
}
}

func (cfg Config) getQueueFactory() func(name string) (queue.Queue, error) {
if cfg.Queues == "-" {
return func(name string) (queue.Queue, error) {
return inmemory.New(32), nil
}
}
return func(name string) (queue.Queue, error) {
return indir.New(filepath.Join(cfg.Queues, name))
}
}
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ require (
github.com/google/uuid v1.1.1
github.com/jessevdk/go-flags v1.4.1-0.20180331124232-1c38ed7ad0cc
github.com/philhofer/fwd v1.0.0 // indirect
github.com/reddec/dfq v0.0.0-20200715120610-5e45d6cc3525
github.com/reddec/dfq v0.0.0-20200716124604-b347d756a38c
github.com/reddec/jsonrpc2 v0.1.18
github.com/robfig/cron v1.2.0
github.com/stretchr/testify v1.5.1
Expand Down
Loading

0 comments on commit add5c3a

Please sign in to comment.