Skip to content

Commit

Permalink
support queues configuration
Browse files Browse the repository at this point in the history
  • Loading branch information
reddec committed Jul 17, 2020
1 parent 6422253 commit ca4e2d8
Show file tree
Hide file tree
Showing 7 changed files with 371 additions and 49 deletions.
12 changes: 9 additions & 3 deletions application/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,11 @@ type Actions interface {
DoScheduled(ctx context.Context, lastRun time.Time, globalEnv map[string]string)
}

type Invokable interface {
// Invoke request, write response. Required header should be set by invoker
Invoke(ctx context.Context, request types.Request, response io.Writer, globalEnv map[string]string) error
}

// Basic invokable entity
//
// Highlights:
Expand All @@ -47,6 +52,7 @@ type Actions interface {
type Lambda interface {
FileSystem
Actions
Invokable
// Manifest configuration
Manifest() types.Manifest
// Update manifest and apply changes (re-index)
Expand All @@ -57,8 +63,6 @@ type Lambda interface {
SetCredentials(creds *types.Credential) error
// Remove lambda
Remove() error
// Invoke request, write response. Required header should be set by invoker
Invoke(ctx context.Context, request types.Request, response io.Writer, globalEnv map[string]string) error
}

// Platform should index lambda, keep shared info (like env) and apply global configuration
Expand Down Expand Up @@ -91,7 +95,9 @@ type Platform interface {
// Remove existent lambda from platform and index (doesn't call underlying Remove() method)
Remove(uid string)
// Invoke lambda with platform global environment and logs results to tracker (if set)
Invoke(ctx context.Context, lambda Lambda, request types.Request, out io.Writer) error
Invoke(ctx context.Context, lambda Invokable, request types.Request, out io.Writer) error
// Same as Find + Invoke, but caller has no control on NotFound error
InvokeByUID(ctx context.Context, uid string, request types.Request, out io.Writer) error
// Do lambda action target defined in Makefile with platform global environment. Time limit and out can be nil
Do(ctx context.Context, lambda Lambda, action string, timeLimit time.Duration, out io.Writer) error
}
Expand Down
11 changes: 10 additions & 1 deletion application/platform/platform.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,16 @@ func (platform *platform) Remove(uid string) {
_ = platform.unsafeSaveConfig()
}

func (platform *platform) Invoke(ctx context.Context, lambda application.Lambda, request types.Request, out io.Writer) error {
func (platform *platform) InvokeByUID(ctx context.Context, uid string, request types.Request, out io.Writer) error {
lambda, err := platform.FindByUID(uid)
if err != nil {
_ = request.Body.Close()
return err
}
return platform.Invoke(ctx, lambda.Lambda, request, out)
}

func (platform *platform) Invoke(ctx context.Context, lambda application.Invokable, request types.Request, out io.Writer) error {
return lambda.Invoke(ctx, request, out, platform.config.Environment)
}

Expand Down
66 changes: 66 additions & 0 deletions application/queuemanager/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
package queuemanager

import (
"github.com/reddec/trusted-cgi/application"
"github.com/reddec/trusted-cgi/internal"
"os"
"sync"
)

type naiveFileStorePayload struct {
Queues []application.Queue `json:"queues"`
}

func FileConfig(filename string) *naiveFileStore {
return &naiveFileStore{file: filename}
}

type naiveFileStore struct {
file string
lock sync.RWMutex
}

func (nfs *naiveFileStore) SetQueues(queues []application.Queue) error {
nfs.lock.Lock()
defer nfs.lock.Unlock()
return internal.AtomicWriteJson(nfs.file, &naiveFileStorePayload{Queues: queues})
}

func (nfs *naiveFileStore) GetQueues() ([]application.Queue, error) {
nfs.lock.RLock()
defer nfs.lock.RUnlock()
var pd naiveFileStorePayload
err := internal.ReadJson(nfs.file, &pd)
if err == nil {
return pd.Queues, nil
}
if os.IsNotExist(err) {
return nil, nil
}
return nil, err
}

func Mock(queues ...application.Queue) *mockStore {
return &mockStore{queues: queues}
}

type mockStore struct {
lock sync.RWMutex
queues []application.Queue
}

func (msc *mockStore) SetQueues(queues []application.Queue) error {
msc.lock.Lock()
defer msc.lock.Unlock()
msc.queues = make([]application.Queue, len(queues))
copy(msc.queues, queues)
return nil
}

func (msc *mockStore) GetQueues() ([]application.Queue, error) {
msc.lock.RLock()
defer msc.lock.RUnlock()
out := make([]application.Queue, len(msc.queues))
copy(out, msc.queues)
return out, nil
}
94 changes: 63 additions & 31 deletions application/queues/impl.go → application/queuemanager/impl.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package queues
package queuemanager

import (
"context"
Expand All @@ -14,31 +14,56 @@ import (
"time"
)

type platform interface {
FindByUID(uid string) (*application.Definition, error)
Invoke(ctx context.Context, lambda application.Lambda, request types.Request, out io.Writer) error
// Store contains queues configuration for reload
type Store interface {
// Save queues list
SetQueues(queues []application.Queue) error
// Load queues list
GetQueues() ([]application.Queue, error)
}

// Minimal required platform features
type Platform interface {
InvokeByUID(ctx context.Context, uid string, request types.Request, out io.Writer) error
}

type QueueFactory func(name string) (queue.Queue, error)

func New(ctx context.Context, platform platform, factory QueueFactory) *queueManager {
return &queueManager{
func New(ctx context.Context, config Store, platform Platform, factory QueueFactory) (*queueManager, error) {
qm := &queueManager{
ctx: ctx,
platform: platform,
queues: map[string]*queueDefinition{},
queueFactory: factory,
config: config,
}
return qm, qm.init()
}

type queueManager struct {
ctx context.Context
lock sync.RWMutex
platform platform
platform Platform
queues map[string]*queueDefinition
queueFactory QueueFactory
config Store
wg sync.WaitGroup
}

func (qm *queueManager) init() error {
list, err := qm.config.GetQueues()
if err != nil {
return err
}
for _, def := range list {
err := qm.addQueueUnsafe(def)
if err != nil {
return err
}
}
return nil
}

func (qm *queueManager) Put(queue string, request *types.Request) error {
qm.lock.RLock()
defer qm.lock.RUnlock()
Expand All @@ -51,29 +76,32 @@ func (qm *queueManager) Put(queue string, request *types.Request) error {
}

func (qm *queueManager) Add(queue application.Queue) error {
qm.lock.Lock()
defer qm.lock.Unlock()
err := qm.addQueueUnsafe(queue)
if err != nil {
return err
}
return qm.config.SetQueues(qm.listUnsafe())
}

func (qm *queueManager) addQueueUnsafe(queue application.Queue) error {
if !application.QueueNameReg.MatchString(queue.Name) {
return fmt.Errorf("invalid queue name: should be %v", application.QueueNameReg)
}
qm.lock.Lock()
defer qm.lock.Unlock()
q, ok := qm.queues[queue.Name]
if ok {
return fmt.Errorf("queue %s already exists", queue.Name)
}

lambda, err := qm.platform.FindByUID(queue.Target)
if err != nil {
return err
}

back, err := qm.queueFactory(queue.Name)
if err != nil {
return err
return fmt.Errorf("add queue %s - create backend for queue: %w", queue.Name, err)
}

q = &queueDefinition{
Queue: application.Queue{},
worker: startWorker(qm.ctx, back, lambda.Lambda, qm.platform, &qm.wg),
Queue: queue,
worker: startWorker(qm.ctx, back, queue.Target, qm.platform, &qm.wg),
queue: back,
}
if qm.queues == nil {
Expand All @@ -93,14 +121,14 @@ func (qm *queueManager) Remove(queue string) error {
q.worker.stop()
<-q.worker.done
delete(qm.queues, queue)
return q.queue.Destroy()
}

func (qm *queueManager) Assign(queue string, targetLambda string) error {
lambda, err := qm.platform.FindByUID(targetLambda)
err := q.queue.Destroy()
if err != nil {
return err
}
return qm.config.SetQueues(qm.listUnsafe())
}

func (qm *queueManager) Assign(queue string, targetLambda string) error {
qm.lock.Lock()
defer qm.lock.Unlock()
q, ok := qm.queues[queue]
Expand All @@ -110,20 +138,23 @@ func (qm *queueManager) Assign(queue string, targetLambda string) error {
q.worker.stop()
<-q.worker.done
q.Target = targetLambda
q.worker = startWorker(qm.ctx, q.queue, lambda.Lambda, qm.platform, &qm.wg)
return nil
q.worker = startWorker(qm.ctx, q.queue, targetLambda, qm.platform, &qm.wg)
return qm.config.SetQueues(qm.listUnsafe())
}

func (qm *queueManager) List() []application.Queue {
var ans = qm.listUnsafe()
sort.Slice(ans, func(i, j int) bool {
return ans[i].Name < ans[j].Name
})
return ans
}

func (qm *queueManager) listUnsafe() []application.Queue {
var ans = make([]application.Queue, 0, len(qm.queues))
qm.lock.RLock()
for _, q := range qm.queues {
ans = append(ans, q.Queue)
}
qm.lock.RUnlock()
sort.Slice(ans, func(i, j int) bool {
return ans[i].Name < ans[j].Name
})
return ans
}

Expand Down Expand Up @@ -154,12 +185,13 @@ type worker struct {
done chan struct{}
}

func startWorker(gctx context.Context, queue queue.Queue, lambda application.Lambda, plt platform, wg *sync.WaitGroup) *worker {
func startWorker(gctx context.Context, queue queue.Queue, uid string, plt Platform, wg *sync.WaitGroup) *worker {
ctx, cancel := context.WithCancel(gctx)
w := &worker{
stop: cancel,
done: make(chan struct{}),
}
wg.Add(1)
go func() {
defer wg.Done()
defer close(w.done)
Expand All @@ -174,7 +206,7 @@ func startWorker(gctx context.Context, queue queue.Queue, lambda application.Lam

if err != nil {
log.Println("queues: failed peek")
} else if err = plt.Invoke(ctx, lambda, *req, os.Stderr); err != nil {
} else if err = plt.InvokeByUID(ctx, uid, *req, os.Stderr); err != nil {
log.Println("queues: failed invoke:", err)
}
err = queue.Commit(ctx)
Expand Down
Loading

0 comments on commit ca4e2d8

Please sign in to comment.