diff --git a/go.mod b/go.mod index 7a161fe0..e13da9d4 100644 --- a/go.mod +++ b/go.mod @@ -3,8 +3,9 @@ module github.com/suborbital/hive go 1.14 require ( + github.com/google/uuid v1.1.2 github.com/pkg/errors v0.9.1 github.com/suborbital/grav v0.0.11 - github.com/suborbital/vektor v0.1.2 + github.com/suborbital/vektor v0.1.3 golang.org/x/sync v0.0.0-20200625203802-6e8e738ad208 ) diff --git a/go.sum b/go.sum index b8f10677..2e9fb0bd 100644 --- a/go.sum +++ b/go.sum @@ -2,6 +2,8 @@ github.com/google/go-cmp v0.4.1 h1:/exdXoGamhu5ONeUJH0deniYLWYvQwW66yvlfiiKTu0= github.com/google/go-cmp v0.4.1/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/uuid v1.1.1 h1:Gkbcsh/GbpXz7lPftLA3P6TYMwjCLYm83jiFQZF/3gY= github.com/google/uuid v1.1.1/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/google/uuid v1.1.2 h1:EVhdT+1Kseyi1/pUmXKaFxYsDNy9RQYkMWRH68J/W7Y= +github.com/google/uuid v1.1.2/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/julienschmidt/httprouter v1.3.0 h1:U0609e9tgbseu3rBINet9P48AI/D3oJs4dN7jwJOQ1U= github.com/julienschmidt/httprouter v1.3.0/go.mod h1:JR6WtHb+2LUe8TCKY3cZOxFyyO8IZAc4RVcycCCAKdM= github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= @@ -16,6 +18,8 @@ github.com/suborbital/vektor v0.1.1 h1:F3n9rS1F3nc+1Q2HZxeVNinvVkCRliVQ01+jRASct github.com/suborbital/vektor v0.1.1/go.mod h1:tJ4gA2P8NWC4Pdu0TgpSBWZHuZ1Qhp+r5RlsqyIqdyw= github.com/suborbital/vektor v0.1.2 h1:d4BvshbMl4wRVYPKO21vka7r89nlRrrZXidYQz07N9Q= github.com/suborbital/vektor v0.1.2/go.mod h1:tJ4gA2P8NWC4Pdu0TgpSBWZHuZ1Qhp+r5RlsqyIqdyw= +github.com/suborbital/vektor v0.1.3 h1:rC5ic4FnjmcbizmV/WAQt67QkF6eJ7jHSsuy8IFC2bc= +github.com/suborbital/vektor v0.1.3/go.mod h1:tJ4gA2P8NWC4Pdu0TgpSBWZHuZ1Qhp+r5RlsqyIqdyw= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/crypto v0.0.0-20200728195943-123391ffb6de h1:ikNHVSjEfnvz6sxdSPCaPt572qowuyMDMJLLm3Db3ig= diff --git a/hive/hive.go b/hive/hive.go index 5a8f9a92..19bee4d5 100644 --- a/hive/hive.go +++ b/hive/hive.go @@ -23,9 +23,10 @@ type Hive struct { // New returns a Hive ready to accept Jobs func New() *Hive { + logger := vlog.Default() h := &Hive{ - scheduler: newScheduler(), - log: vlog.Default(), + scheduler: newScheduler(logger), + log: logger, } return h @@ -41,10 +42,7 @@ func (h *Hive) Handle(jobType string, runner Runnable, options ...Option) JobFun h.handle(jobType, runner, options...) helper := func(data interface{}) *Result { - job := Job{ - jobType: jobType, - data: data, - } + job := NewJob(jobType, data) return h.Do(job) } @@ -60,10 +58,7 @@ func (h *Hive) HandleMsg(pod *grav.Pod, msgType string, runner Runnable, options h.handle(msgType, runner, options...) helper := func(data interface{}) *Result { - job := Job{ - jobType: msgType, - data: data, - } + job := NewJob(msgType, data) return h.Do(job) } diff --git a/hive/hive_test.go b/hive/hive_test.go index 75c97bea..f3c7d019 100644 --- a/hive/hive_test.go +++ b/hive/hive_test.go @@ -34,7 +34,7 @@ func TestHiveJob(t *testing.T) { r := h.Do(h.Job("generic", "first")) - if r.ID == "" { + if r.UUID() == "" { t.Error("result ID is empty") } diff --git a/hive/job.go b/hive/job.go index 5bf3e179..31260140 100644 --- a/hive/job.go +++ b/hive/job.go @@ -4,28 +4,50 @@ import ( "encoding/json" "errors" + "github.com/google/uuid" "github.com/suborbital/grav/grav" ) -// Job describes a job to be done -type Job struct { +// JobReference is a lightweight reference to a Job +type JobReference struct { + uuid string jobType string - data interface{} result *Result } +// Job describes a job to be done +type Job struct { + JobReference + data interface{} + resultData interface{} + resultErr error +} + // NewJob creates a new job func NewJob(jobType string, data interface{}) Job { j := Job{ - jobType: jobType, - data: data, + JobReference: JobReference{ + uuid: uuid.New().String(), + jobType: jobType, + }, + data: data, } return j } +// UUID returns the Job's UUID +func (j JobReference) UUID() string { + return j.uuid +} + +// Reference returns a reference to the Job +func (j Job) Reference() JobReference { + return j.JobReference +} + // Unmarshal unmarshals the job's data into a struct -func (j *Job) Unmarshal(target interface{}) error { +func (j Job) Unmarshal(target interface{}) error { if bytes, ok := j.data.([]byte); ok { return json.Unmarshal(bytes, target) } @@ -33,16 +55,19 @@ func (j *Job) Unmarshal(target interface{}) error { return errors.New("failed to Unmarshal, job data is not []byte") } -func (j *Job) String() string { - if s, ok := j.data.(string); ok { +// String returns the string value of a job's data +func (j Job) String() string { + if s, isString := j.data.(string); isString { return s + } else if b, isBytes := j.data.([]byte); isBytes { + return string(b) } return "" } // Bytes returns the []byte value of the job's data -func (j *Job) Bytes() []byte { +func (j Job) Bytes() []byte { if v, ok := j.data.([]byte); ok { return v } @@ -51,7 +76,7 @@ func (j *Job) Bytes() []byte { } // Int returns the int value of the job's data -func (j *Job) Int() int { +func (j Job) Int() int { if v, ok := j.data.(int); ok { return v } @@ -60,12 +85,12 @@ func (j *Job) Int() int { } // Data returns the "raw" data for the job -func (j *Job) Data() interface{} { +func (j Job) Data() interface{} { return j.data } // Msg returns a grav.Message stored in the Job, if any -func (j *Job) Msg() grav.Message { +func (j Job) Msg() grav.Message { msg, ok := j.data.(grav.Message) if !ok { return nil @@ -73,3 +98,9 @@ func (j *Job) Msg() grav.Message { return msg } + +// loadResult has a pointer reciever such that it actually modifies the object it's being called on +func (j *Job) loadResult(resultData interface{}, errString string) { + j.resultData = resultData + j.resultErr = errors.New(errString) +} diff --git a/hive/result.go b/hive/result.go index beb54e2d..b674e2b5 100644 --- a/hive/result.go +++ b/hive/result.go @@ -3,26 +3,43 @@ package hive import ( "encoding/json" - "github.com/suborbital/hive/util" - "github.com/pkg/errors" ) // Result describes a result type Result struct { - ID string + uuid string data interface{} err error resultChan chan bool errChan chan bool + removeFunc removeFunc } // ResultFunc is a result callback function. type ResultFunc func(interface{}, error) +func newResult(uuid string, remove removeFunc) *Result { + r := &Result{ + uuid: uuid, + resultChan: make(chan bool, 1), // buffered, so the result can be written and related goroutines can end before Then() is called + errChan: make(chan bool, 1), + removeFunc: remove, + } + + return r +} + +// UUID returns the result/job's UUID +func (r *Result) UUID() string { + return r.uuid +} + // Then returns the result or error from a Result func (r *Result) Then() (interface{}, error) { + defer r.removeFunc(r.uuid) + select { case <-r.resultChan: return r.data, nil @@ -80,16 +97,6 @@ func (r *Result) Discard() { }() } -func newResult() *Result { - r := &Result{ - ID: util.GenerateResultID(), - resultChan: make(chan bool, 1), // buffered, so the result can be written and related goroutines can end before Then() is called - errChan: make(chan bool, 1), - } - - return r -} - func (r *Result) sendResult(data interface{}) { // if the result is another Result, // wait for its result and recursively send it diff --git a/hive/scheduler.go b/hive/scheduler.go index ca515925..d10fe867 100644 --- a/hive/scheduler.go +++ b/hive/scheduler.go @@ -5,18 +5,21 @@ import ( "sync" "github.com/pkg/errors" + "github.com/suborbital/vektor/vlog" ) type scheduler struct { workers map[string]*worker - - starter sync.Once + store Storage + logger *vlog.Logger sync.Mutex } -func newScheduler() *scheduler { +func newScheduler(logger *vlog.Logger) *scheduler { s := &scheduler{ workers: map[string]*worker{}, + store: newMemoryStorage(), + logger: logger, Mutex: sync.Mutex{}, } @@ -24,14 +27,12 @@ func newScheduler() *scheduler { } func (s *scheduler) schedule(job Job) *Result { - s.starter.Do(func() { - if s.workers == nil { - s.workers = map[string]*worker{} + result := newResult(job.UUID(), func(uuid string) { + if err := s.store.Remove(uuid); err != nil { + s.logger.Error(errors.Wrap(err, "scheduler failed to Remove Job from storage")) } }) - result := newResult() - worker := s.getWorker(job.jobType) if worker == nil { result.sendErr(fmt.Errorf("failed to getRunnable for jobType %q", job.jobType)) @@ -48,7 +49,9 @@ func (s *scheduler) schedule(job Job) *Result { } job.result = result - worker.schedule(job) + s.store.Add(job) + + worker.schedule(job.Reference()) }() return result @@ -65,7 +68,7 @@ func (s *scheduler) handle(jobType string, runnable Runnable, options ...Option) opts = o(opts) } - w := newWorker(runnable, opts) + w := newWorker(runnable, s.store, opts) if s.workers == nil { s.workers = map[string]*worker{jobType: w} } else { diff --git a/hive/server.go b/hive/server.go index 4563cb6b..22ffb1df 100644 --- a/hive/server.go +++ b/hive/server.go @@ -89,7 +89,7 @@ func (s *Server) scheduleHandler() vk.HandlerFunc { s.addInFlight(res) resp := doResponse{ - ResultID: res.ID, + ResultID: res.UUID(), } return resp, nil @@ -155,7 +155,7 @@ func (s *Server) addInFlight(r *Result) { s.Lock() defer s.Unlock() - s.inFlight[r.ID] = r + s.inFlight[r.UUID()] = r } func (s *Server) getInFlight(id string) *Result { diff --git a/hive/storage.go b/hive/storage.go new file mode 100644 index 00000000..cc09c8f8 --- /dev/null +++ b/hive/storage.go @@ -0,0 +1,92 @@ +package hive + +import ( + "sync" + + "github.com/pkg/errors" +) + +// ErrJobNotFound and others are storage realated errors +var ( + ErrJobNotFound = errors.New("job not found in storage") +) + +// Storage represents a storage driver for Hive +type Storage interface { + Add(Job) error + AddResult(string, interface{}, error) error + Get(string) (Job, error) + Remove(string) error +} + +// MemoryStorage is the default in-memory storage driver for Hive +type MemoryStorage struct { + jobs sync.Map + results sync.Map + errors sync.Map +} + +// a function that can be given to a Result to remove a Job from storage once its result has been delivered +type removeFunc func(string) + +func newMemoryStorage() *MemoryStorage { + m := &MemoryStorage{ + jobs: sync.Map{}, + results: sync.Map{}, + errors: sync.Map{}, + } + + return m +} + +// Add adds a Job to storage +func (m *MemoryStorage) Add(job Job) error { + // store as a pointer + m.jobs.Store(job.UUID(), &job) + + return nil +} + +// AddResult adds a Job result to storage +func (m *MemoryStorage) AddResult(uuid string, data interface{}, err error) error { + if err != nil { + m.errors.Store(uuid, err.Error()) + } else { + m.results.Store(uuid, data) + } + + return nil +} + +// Get loads a Job and any of its results from storage +func (m *MemoryStorage) Get(uuid string) (Job, error) { + rawJob, ok := m.jobs.Load(uuid) + if !ok { + return Job{}, ErrJobNotFound + } + + // cast to pointer as loadResult has a pointer receiver + job := rawJob.(*Job) + + res, _ := m.results.Load(uuid) + + var errString string + + rawErr, ok := m.errors.Load(uuid) + if ok { + errString = rawErr.(string) + } + + job.loadResult(res, errString) + + return *job, nil +} + +// Remove removes a Job and its data from storage +func (m *MemoryStorage) Remove(uuid string) error { + m.jobs.Delete(uuid) + m.results.Delete(uuid) + m.errors.Delete(uuid) + + return nil +} diff --git a/hive/worker.go b/hive/worker.go index cb3fa90a..019290c6 100644 --- a/hive/worker.go +++ b/hive/worker.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "sync" + "sync/atomic" "time" "github.com/pkg/errors" @@ -20,38 +21,46 @@ var ( type worker struct { runner Runnable - workChan chan Job + workChan chan JobReference + store Storage options workerOpts threads []*workThread threadLock sync.Mutex - started bool - starter sync.Once + started atomic.Value } // newWorker creates a new goWorker -func newWorker(runner Runnable, opts workerOpts) *worker { +func newWorker(runner Runnable, store Storage, opts workerOpts) *worker { w := &worker{ runner: runner, - workChan: make(chan Job, defaultChanSize), + workChan: make(chan JobReference, defaultChanSize), + store: store, options: opts, threads: make([]*workThread, opts.poolSize), threadLock: sync.Mutex{}, - started: false, + started: atomic.Value{}, } + w.started.Store(false) + return w } -func (w *worker) schedule(job Job) { +func (w *worker) schedule(job JobReference) { go func() { w.workChan <- job }() } func (w *worker) start(doFunc DoFunc) error { - w.starter.Do(func() { w.started = true }) + // this should only be run once per worker + if isStarted := w.started.Load().(bool); isStarted { + return nil + } + + w.started.Store(true) started := 0 attempts := 0 @@ -59,7 +68,7 @@ func (w *worker) start(doFunc DoFunc) error { for { // fill the "pool" with workThreads for i := started; i < w.options.poolSize; i++ { - wt := newWorkThread(w.runner, w.workChan, w.options.jobTimeoutSeconds) + wt := newWorkThread(w.runner, w.workChan, w.store, w.options.jobTimeoutSeconds) // give the runner opportunity to provision resources if needed if err := w.runner.OnStart(); err != nil { @@ -90,23 +99,25 @@ func (w *worker) start(doFunc DoFunc) error { } func (w *worker) isStarted() bool { - return w.started + return w.started.Load().(bool) } type workThread struct { runner Runnable - workChan chan Job + workChan chan JobReference + store Storage timeoutSeconds int ctx context.Context cancelFunc context.CancelFunc } -func newWorkThread(runner Runnable, workChan chan Job, timeoutSeconds int) *workThread { +func newWorkThread(runner Runnable, workChan chan JobReference, store Storage, timeoutSeconds int) *workThread { ctx, cancelFunc := context.WithCancel(context.Background()) wt := &workThread{ runner: runner, workChan: workChan, + store: store, timeoutSeconds: timeoutSeconds, ctx: ctx, cancelFunc: cancelFunc, @@ -124,10 +135,16 @@ func (wt *workThread) run(doFunc DoFunc) { } // wait for the next job - job := <-wt.workChan + jobRef := <-wt.workChan + + // fetch the full job from storage + job, err := wt.store.Get(jobRef.uuid) + if err != nil { + jobRef.result.sendErr(err) + continue + } var result interface{} - var err error if wt.timeoutSeconds == 0 { result, err = wt.runner.Run(job, doFunc) @@ -135,12 +152,14 @@ func (wt *workThread) run(doFunc DoFunc) { result, err = wt.runWithTimeout(job, doFunc) } + wt.store.AddResult(job.UUID(), result, err) + if err != nil { - job.result.sendErr(err) + jobRef.result.sendErr(err) continue } - job.result.sendResult(result) + jobRef.result.sendResult(result) } }() }