Skip to content

Commit

Permalink
fixed size worker pool to serve new job requests (#72)
Browse files Browse the repository at this point in the history
* X amount of goroutines listen to a queue for new job requests
* HTTP new job requests place the work into the queue
* the result is communicated back to the HTTP request goroutine through
a per-request unique result channel
  • Loading branch information
dtheodor committed May 8, 2018
1 parent 0c8117b commit 73c44ec
Show file tree
Hide file tree
Showing 11 changed files with 263 additions and 26 deletions.
14 changes: 14 additions & 0 deletions cmd/mistryd/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"errors"
"io"
"os"
"runtime"
"strconv"

"github.com/skroutz/mistry/pkg/filesystem"
Expand All @@ -21,6 +22,9 @@ type Config struct {
ProjectsPath string `json:"projects_path"`
BuildPath string `json:"build_path"`
Mounts map[string]string `json:"mounts"`

Concurrency int `json:"job_concurrency"`
Backlog int `json:"job_backlog"`
}

// ParseConfig accepts the listening address, a filesystem adapter and a
Expand Down Expand Up @@ -55,5 +59,15 @@ func ParseConfig(addr string, fs filesystem.FileSystem, r io.Reader) (*Config, e
return nil, err
}

if cfg.Concurrency == 0 {
// our work is CPU bound so number of cores is OK
cfg.Concurrency = runtime.NumCPU()
}

if cfg.Backlog == 0 {
// by default allow a request spike double the worker capacity
cfg.Backlog = cfg.Concurrency * 2
}

return cfg, nil
}
4 changes: 3 additions & 1 deletion cmd/mistryd/config.sample.json
Original file line number Diff line number Diff line change
Expand Up @@ -3,5 +3,7 @@
"build_path": "/var/lib/mistry/data",
"mounts": {
"/var/lib/mistry/.ssh": "/home/mistry/.ssh"
}
},
"job_concurrency": 5,
"job_backlog": 20
}
4 changes: 3 additions & 1 deletion cmd/mistryd/config.test.json
Original file line number Diff line number Diff line change
Expand Up @@ -3,5 +3,7 @@
"build_path": "/tmp",
"mounts": {
"/tmp": "/tmp"
}
},
"job_concurrency": 5,
"job_backlog": 100
}
8 changes: 7 additions & 1 deletion cmd/mistryd/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ type Job struct {
Project string
Params types.Params
Group string
Rebuild bool

RootBuildPath string
PendingBuildPath string
Expand Down Expand Up @@ -62,7 +63,12 @@ type Job struct {

// NewJobFromRequest returns a new Job from the JobRequest
func NewJobFromRequest(jr types.JobRequest, cfg *Config) (*Job, error) {
return NewJob(jr.Project, jr.Params, jr.Group, cfg)
j, err := NewJob(jr.Project, jr.Params, jr.Group, cfg)
if err != nil {
return nil, err
}
j.Rebuild = jr.Rebuild
return j, nil
}

// NewJob returns a new Job for the given project. project and cfg cannot be
Expand Down
1 change: 1 addition & 0 deletions cmd/mistryd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,5 +181,6 @@ func StartServer(cfg *Config) error {
}()
s.Log.Printf("Listening on %s...", cfg.Addr)
wg.Wait()
s.workerPool.Stop()
return nil
}
53 changes: 32 additions & 21 deletions cmd/mistryd/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,11 @@ import (
type Server struct {
Log *log.Logger

srv *http.Server
jq *JobQueue
pq *ProjectQueue
cfg *Config
srv *http.Server
jq *JobQueue
pq *ProjectQueue
cfg *Config
workerPool *WorkerPool

// web-view related

Expand Down Expand Up @@ -87,6 +88,7 @@ func NewServer(cfg *Config, logger *log.Logger) (*Server, error) {
s.pq = NewProjectQueue()
s.br = broker.NewBroker(s.Log)
s.tq = new(sync.Map)
s.workerPool = NewWorkerPool(s, cfg.Concurrency, cfg.Backlog, logger)
return s, nil
}

Expand Down Expand Up @@ -118,28 +120,43 @@ func (s *Server) HandleNewJob(w http.ResponseWriter, r *http.Request) {
return
}

if _, isAsync := r.URL.Query()["async"]; isAsync {
s.handleNewJobAsync(j, jr, w)
// send the work item to the worker pool
future, err := s.workerPool.SendWork(j)
if err != nil {
// the in-memory queue is overloaded, we have to wait for the workers to pick
// up new items.
// return a 503 to signal that the server is overloaded and for clients to try
// again later
// 503 is an appropriate status code to signal that the server is overloaded
// for all users, while 429 would have been used if we implemented user-specific
// throttling
s.Log.Print("Failed to send message to work queue")
w.WriteHeader(http.StatusServiceUnavailable)
return
}

// if async, we're done, otherwise wait for the result in the result channel
_, async := r.URL.Query()["async"]
if async {
s.Log.Printf("Scheduled %s", j)
w.WriteHeader(http.StatusCreated)
} else {
s.handleNewJobSync(j, jr, w)
s.Log.Printf("Waiting for result of %s...", j)
s.writeWorkResult(j, future.Wait(), w)
}
}

// handleNewJobSync triggers the build synchronously, and writes the
// build result JSON to the response
func (s *Server) handleNewJobSync(j *Job, jr types.JobRequest, w http.ResponseWriter) {
s.Log.Printf("Building %s...", j)
buildInfo, err := s.Work(context.Background(), j, jr)
if err != nil {
http.Error(w, fmt.Sprintf("Error building %s: %s", j, err),
func (s *Server) writeWorkResult(j *Job, r WorkResult, w http.ResponseWriter) {
if r.Err != nil {
http.Error(w, fmt.Sprintf("Error building %s: %s", j, r.Err),
http.StatusInternalServerError)
return
}

w.WriteHeader(http.StatusCreated)
w.Header().Set("Content-Type", "application/json")

resp, err := json.Marshal(buildInfo)
resp, err := json.Marshal(r.BuildInfo)
if err != nil {
s.Log.Print(err)
}
Expand All @@ -149,12 +166,6 @@ func (s *Server) handleNewJobSync(j *Job, jr types.JobRequest, w http.ResponseWr
}
}

func (s *Server) handleNewJobAsync(j *Job, jr types.JobRequest, w http.ResponseWriter) {
s.Log.Printf("Scheduling %s...", j)
go s.Work(context.Background(), j, jr)
w.WriteHeader(http.StatusCreated)
}

// HandleIndex returns all available jobs.
func (s *Server) HandleIndex(w http.ResponseWriter, r *http.Request) {
if r.Method != "GET" {
Expand Down
8 changes: 8 additions & 0 deletions cmd/mistryd/testdata/projects/sleep/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
FROM debian:stretch

COPY docker-entrypoint.sh /usr/local/bin/docker-entrypoint.sh
RUN chmod +x /usr/local/bin/docker-entrypoint.sh

WORKDIR /data

ENTRYPOINT ["/usr/local/bin/docker-entrypoint.sh"]
4 changes: 4 additions & 0 deletions cmd/mistryd/testdata/projects/sleep/docker-entrypoint.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
#!/bin/bash
set -e

sleep 10
4 changes: 2 additions & 2 deletions cmd/mistryd/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import (

// Work performs the work denoted by j and returns a BuildInfo upon
// successful completion, or an error.
func (s *Server) Work(ctx context.Context, j *Job, jr types.JobRequest) (buildInfo *types.BuildInfo, err error) {
func (s *Server) Work(ctx context.Context, j *Job) (buildInfo *types.BuildInfo, err error) {
log := log.New(os.Stderr, fmt.Sprintf("[worker] [%s] ", j), log.LstdFlags)
start := time.Now()
_, err = os.Stat(j.ReadyBuildPath)
Expand Down Expand Up @@ -176,7 +176,7 @@ func (s *Server) Work(ctx context.Context, j *Job, jr types.JobRequest) (buildIn
return
}

err = j.BuildImage(ctx, s.cfg.UID, client, out, jr.Rebuild, jr.Rebuild)
err = j.BuildImage(ctx, s.cfg.UID, client, out, j.Rebuild, j.Rebuild)
if err != nil {
err = workErr("could not build docker image", err)
return
Expand Down
113 changes: 113 additions & 0 deletions cmd/mistryd/worker_pool.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
package main

import (
"context"
"errors"
"fmt"
"log"
"sync"

"github.com/skroutz/mistry/pkg/types"
)

// WorkResult contains the result of a job, either a buildinfo or an error
type WorkResult struct {
BuildInfo *types.BuildInfo
Err error
}

// FutureWorkResult is a WorkResult that may not yet have become available and
// can be Wait()'ed on
type FutureWorkResult struct {
result <-chan WorkResult
}

// Wait waits for the result to become available and returns it
func (f FutureWorkResult) Wait() WorkResult {
r, ok := <-f.result
if !ok {
// this should never happen, reading from the result channel is exclusive to
// this future
panic("Failed to read from result channel")
}
return r
}

// workItem contains a job and a channel to place the job result. struct
// used in the internal work queue
type workItem struct {
job *Job
result chan<- WorkResult
}

// WorkerPool implements a fixed-size pool of worker goroutines that can be sent
// build jobs and communicate their result
type WorkerPool struct {
// the fixed amount of goroutines that will be handling running jobs
concurrency int

// the maximum backlog of pending requests. if exceeded, sending new work
// to the pool will return an error
backlogSize int

queue chan workItem
wg sync.WaitGroup
}

// NewWorkerPool creates a new worker pool
func NewWorkerPool(s *Server, concurrency, backlog int, logger *log.Logger) *WorkerPool {
p := new(WorkerPool)
p.concurrency = concurrency
p.backlogSize = backlog
p.queue = make(chan workItem, backlog)

for i := 0; i < concurrency; i++ {
go work(s, i, p.queue, &p.wg)
p.wg.Add(1)
}
logger.Printf("Set up %d workers", concurrency)
return p
}

// Stop signals the workers to close and blocks until they are closed.
func (p *WorkerPool) Stop() {
close(p.queue)
p.wg.Wait()
}

// SendWork schedules work on p and returns a FutureWorkResult. The actual result can be
// obtained by FutureWorkResult.Wait(). An error is returned if the backlog is full and
// cannot accept any new work items
func (p *WorkerPool) SendWork(j *Job) (FutureWorkResult, error) {
resultQueue := make(chan WorkResult, 1)
wi := workItem{j, resultQueue}
result := FutureWorkResult{resultQueue}

select {
case p.queue <- wi:
return result, nil
default:
return result, errors.New("queue is full")
}
}

// work listens to the workQueue, runs Work() on any incoming work items, and
// sends the result through the result queue
func work(s *Server, id int, queue <-chan workItem, wg *sync.WaitGroup) {
defer wg.Done()
logPrefix := fmt.Sprintf("[worker %d]", id)
for item := range queue {
s.Log.Printf("%s received work item %#v", logPrefix, item)
buildInfo, err := s.Work(context.Background(), item.job)

select {
case item.result <- WorkResult{buildInfo, err}:
s.Log.Printf("%s wrote result to the result channel", logPrefix)
default:
// this should never happen, the result chan should be unique for this worker
s.Log.Panicf("%s failed to write result to the result channel", logPrefix)
}
close(item.result)
}
s.Log.Printf("%s exiting...", logPrefix)
}
76 changes: 76 additions & 0 deletions cmd/mistryd/worker_pool_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
package main

import (
"testing"
"time"

"github.com/skroutz/mistry/pkg/types"
)

func TestBacklogLimit(t *testing.T) {
wp, cfg := setupQueue(t, 1, 0)
defer wp.Stop()

params := types.Params{"test": "pool-backlog-limit"}
params2 := types.Params{"test": "pool-backlog-limit2"}
project := "simple"

sendWorkNoErr(wp, project, params, cfg, t)
_, _, err := sendWork(wp, project, params2, cfg, t)

if err == nil {
t.Fatal("Expected error")
}
}

func TestConcurrency(t *testing.T) {
// instatiate server with 1 worker
wp, cfg := setupQueue(t, 1, 100)
defer wp.Stop()

project := "sleep"
params := types.Params{"test": "pool-concurrency"}
params2 := types.Params{"test": "pool-concurrency2"}

sendWorkNoErr(wp, project, params, cfg, t)
// give the chance for the worker to start work
time.Sleep(1 * time.Second)

j, _ := sendWorkNoErr(wp, project, params2, cfg, t)

// the queue should contain only 1 item, the work item for the 2nd job
assertEq(len(wp.queue), 1, t)
select {
case i, ok := <-wp.queue:
if !ok {
t.Fatalf("Unexpectedly closed worker pool queue")
}
assertEq(i.job, j, t)
default:
t.Fatalf("Expected to find a work item in the queue")
}
}

func setupQueue(t *testing.T, workers, backlog int) (*WorkerPool, *Config) {
cfg := testcfg
cfg.Concurrency = workers
cfg.Backlog = backlog

s, err := NewServer(cfg, nil)
failIfError(err, t)
return s.workerPool, cfg
}

func sendWork(wp *WorkerPool, project string, params types.Params, cfg *Config, t *testing.T) (*Job, FutureWorkResult, error) {
j, err := NewJob(project, params, "", cfg)
failIfError(err, t)

r, err := wp.SendWork(j)
return j, r, err
}

func sendWorkNoErr(wp *WorkerPool, project string, params types.Params, cfg *Config, t *testing.T) (*Job, FutureWorkResult) {
j, r, err := sendWork(wp, project, params, cfg, t)
failIfError(err, t)
return j, r
}

0 comments on commit 73c44ec

Please sign in to comment.