Skip to content

Commit

Permalink
MAKE-339: add time info to job interface (evergreen-ci#20)
Browse files Browse the repository at this point in the history
  • Loading branch information
tychoish committed Jan 31, 2018
1 parent e64fddc commit f3377ea
Show file tree
Hide file tree
Showing 15 changed files with 164 additions and 20 deletions.
21 changes: 21 additions & 0 deletions interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,13 @@ type Job interface {
Status() JobStatusInfo
SetStatus(JobStatusInfo)

// TimeInfo reports the start/end time of jobs, as well as
// providing for a "wait until" functionality that queues can
// use to schedule jobs in the future. The update method, only
// updates non-zero methods.
TimeInfo() JobTimeInfo
UpdateTimeInfo(JobTimeInfo)

// Provides access to the job's priority value, which some
// queues may use to order job dispatching. Most Jobs
// implement these values by composing the
Expand Down Expand Up @@ -79,6 +86,20 @@ type JobStatusInfo struct {
ModificationCount int `bson:"mod_count" json:"mod_count" yaml:"mod_count"`
}

// JobTimeInfo stores timing information for a job and is used by both
// the Runner and Job implementations to track how long jobs take to
// execute. Additionally, the Queue implementations __may__ use this
// data to delay execution of a job when WaitUntil refers to a time
// in the future.
type JobTimeInfo struct {
Start time.Time `bson:"start,omitempty" json:"start,omitempty" yaml:"start,omitempty"`
End time.Time `bson:"end,omitempty" json:"end,omitempty" yaml:"end,omitempty"`
WaitUntil time.Time `bson:"wait_until,omitempty" json:"wait_until,omitempty" yaml:"wait_until,omitempty"`
}

// Duration is a convenience function to return a duration for a job.
func (j JobTimeInfo) Duration() time.Duration { return j.End.Sub(j.Start) }

// Queue describes a very simple Job queue interface that allows users
// to define Job objects, add them to a worker queue and execute tasks
// from that queue. Queue implementations may run locally or as part
Expand Down
37 changes: 33 additions & 4 deletions job/base.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,11 @@ import (
// an implementation of most common Job methods which most jobs
// need not implement themselves.
type Base struct {
TaskID string `bson:"name" json:"name" yaml:"name"`
JobType amboy.JobType `bson:"job_type" json:"job_type" yaml:"job_type"`
Errors []string `bson:"errors" json:"errors" yaml:"errors"`
PriorityValue int `bson:"priority" json:"priority" yaml:"priority"`
TaskID string `bson:"name" json:"name" yaml:"name"`
JobType amboy.JobType `bson:"job_type" json:"job_type" yaml:"job_type"`
Errors []string `bson:"errors" json:"errors" yaml:"errors"`
PriorityValue int `bson:"priority" json:"priority" yaml:"priority"`
TaskTimeInfo amboy.JobTimeInfo `bson:"time_info" json:"time_info" yaml:"time_info"`

status amboy.JobStatusInfo
dep dependency.Manager
Expand Down Expand Up @@ -178,3 +179,31 @@ func (b *Base) SetStatus(s amboy.JobStatusInfo) {
b.status = s

}

// TimeInfo returns the job's TimeInfo object. The runner
// implementations are responsible for updating these values.
func (b *Base) TimeInfo() amboy.JobTimeInfo {
b.mutex.RLock()
defer b.mutex.RUnlock()

return b.TaskTimeInfo
}

// UpdateTimeInfo updates the stored value of time in the job, but
// does *not* modify fields that are unset in the input document.
func (b *Base) UpdateTimeInfo(i amboy.JobTimeInfo) {
b.mutex.Lock()
defer b.mutex.Unlock()

if !i.Start.IsZero() {
b.TaskTimeInfo.Start = i.Start
}

if !i.End.IsZero() {
b.TaskTimeInfo.End = i.End
}

if !i.WaitUntil.IsZero() {
b.TaskTimeInfo.WaitUntil = i.WaitUntil
}
}
44 changes: 42 additions & 2 deletions job/base_test.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
package job

import (
"errors"
"strings"
"testing"
"time"

"github.com/mongodb/amboy"
"github.com/mongodb/amboy/dependency"
"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"
Expand Down Expand Up @@ -44,7 +45,7 @@ func (s *BaseCheckSuite) TestAddErrorWithNilObjectDoesNotChangeErrorState() {

func (s *BaseCheckSuite) TestAddErrorsPersistsErrorsInJob() {
for i := 1; i <= 100; i++ {
s.base.AddError(errors.New("foo"))
s.base.AddError(erbrors.New("foo"))
s.Error(s.base.Error())
s.Len(s.base.Errors, i)
s.True(s.base.HasErrors())
Expand Down Expand Up @@ -80,3 +81,42 @@ func (s *BaseCheckSuite) TestMarkCompleteHelperSetsCompleteState() {

s.True(s.base.status.Completed)
}

func (s *BaseCheckSuite) TestDefaultTimeInfoIsUnset() {
ti := s.base.TimeInfo()
s.Zero(ti.Start)
s.Zero(ti)
s.Zero(ti.End)
s.Zero(ti.WaitUntil)
}

func (s *BaseCheckSuite) TestTimeInfoSetsValues() {
ti := s.base.TimeInfo()
ti.Start = time.Now()
ti.End = ti.Start.Add(time.Hour)
s.Zero(ti.WaitUntil)
s.Equal(time.Hour, ti.Duration())

new := amboy.JobTimeInfo{}
s.base.UpdateTimeInfo(ti)
s.NotEqual(new, s.base.TimeInfo())
s.Zero(s.base.TimeInfo().WaitUntil)

new.End = ti.Start.Add(time.Minute)
s.base.UpdateTimeInfo(new)
result := s.base.TimeInfo()
s.Equal(ti.Start, result.Start)
s.Equal(time.Minute, result.Duration())
s.Equal(new.End, result.End)
s.NotEqual(ti.End, result.End)
s.Zero(s.base.TimeInfo().WaitUntil)

new = amboy.JobTimeInfo{WaitUntil: time.Now()}
s.base.UpdateTimeInfo(new)
last := s.base.TimeInfo()
s.Equal(new.WaitUntil, last.WaitUntil)
s.NotEqual(new.Start, last.Start)
s.NotEqual(new.End, last.End)
s.Equal(result.Start, last.Start)
s.Equal(result.End, last.End)
}
7 changes: 2 additions & 5 deletions job/shell.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"fmt"
"os/exec"
"strings"
"sync"

"github.com/mongodb/amboy"
"github.com/mongodb/amboy/dependency"
Expand All @@ -19,9 +18,7 @@ type ShellJob struct {
WorkingDir string `bson:"working_dir" json:"working_dir" yaml:"working_dir"`
Env map[string]string `bson:"env" json:"env" yaml:"env"`

*Base `bson:"job_base" json:"job_base" yaml:"job_base"`

mutex sync.RWMutex
Base `bson:"job_base" json:"job_base" yaml:"job_base"`
}

// NewShellJob takes the command, as a string along with the name of a
Expand Down Expand Up @@ -49,7 +46,7 @@ func NewShellJob(cmd string, creates string) *ShellJob {
func NewShellJobInstance() *ShellJob {
j := &ShellJob{
Env: make(map[string]string),
Base: &Base{
Base: Base{
JobType: amboy.JobType{
Name: "shell",
Version: 1,
Expand Down
7 changes: 7 additions & 0 deletions pool/group.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"errors"
"fmt"
"sync"
"time"

"github.com/mongodb/amboy"
"github.com/mongodb/grip"
Expand Down Expand Up @@ -205,7 +206,13 @@ func groupWorker(ctx context.Context, wg *sync.WaitGroup, name string, work <-ch
case <-ctx.Done():
return
default:
ti := amboy.JobTimeInfo{
Start: time.Now(),
}
unit.j.Run()
ti.End = time.Now()

unit.j.UpdateTimeInfo(ti)
unit.q.Complete(ctx, unit.j)
}
}
Expand Down
7 changes: 7 additions & 0 deletions pool/local.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"context"
"errors"
"sync"
"time"

"github.com/mongodb/amboy"
"github.com/mongodb/grip"
Expand Down Expand Up @@ -121,7 +122,13 @@ func worker(ctx context.Context, jobs <-chan amboy.Job, q amboy.Queue, wg *sync.
continue
}

ti := amboy.JobTimeInfo{
Start: time.Now(),
}

job.Run()
ti.End = time.Now()
job.UpdateTimeInfo(ti)
q.Complete(ctx, job)
}
}
Expand Down
7 changes: 7 additions & 0 deletions pool/rate_limiting.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,14 @@ func (p *simpleRateLimited) worker(ctx context.Context, jobs <-chan amboy.Job) {
case <-ctx.Done():
return
case job := <-jobs:
ti := amboy.JobTimeInfo{
Start: time.Now(),
}

job.Run()
ti.End = time.Now()
job.UpdateTimeInfo(ti)

p.queue.Complete(ctx, job)
timer.Reset(p.interval)
}
Expand Down
7 changes: 7 additions & 0 deletions pool/rate_limiting_average.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,14 @@ func (p *ewmaRateLimiting) worker(ctx context.Context, jobs <-chan amboy.Job) {

func (p *ewmaRateLimiting) runJob(ctx context.Context, j amboy.Job) time.Duration {
start := time.Now()
ti := amboy.JobTimeInfo{
Start: start,
}

j.Run()
ti.End = time.Now()
j.UpdateTimeInfo(ti)

p.queue.Complete(ctx, j)
duration := time.Since(start)

Expand Down
6 changes: 0 additions & 6 deletions pool/single_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,6 @@ import (

"github.com/mongodb/amboy"
"github.com/mongodb/amboy/job"
"github.com/mongodb/grip"
"github.com/mongodb/grip/level"
"github.com/stretchr/testify/suite"
)

Expand All @@ -22,10 +20,6 @@ func TestSingleWorkerSuite(t *testing.T) {
suite.Run(t, new(SingleRunnerSuite))
}

func (s *SingleRunnerSuite) SetupSuite() {
s.NoError(grip.SetThreshold(level.Info))
}

func (s *SingleRunnerSuite) SetupTest() {
s.pool = NewSingle().(*single)
s.queue = NewQueueTester(s.pool)
Expand Down
2 changes: 1 addition & 1 deletion queue/driver_mongodb.go
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,7 @@ func (d *mongoDB) Save(j amboy.Job) error {
info, err := jobs.Upsert(d.getAtomicQuery(j.ID(), j.Status()), job)
if err != nil {
err = errors.Wrapf(err, "problem updating %s: %+v", name, info)
grip.Warning(err)
grip.Alert(err)
return err
}

Expand Down
2 changes: 1 addition & 1 deletion queue/ordered_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ func (s *OrderedQueueSuite) TestPuttingAJobIntoAQueueImpactsStats() {
jReturn, ok := s.queue.Get(j.ID())
s.True(ok)

base := &job.Base{}
base := job.Base{}
jActual := jReturn.(*job.ShellJob)
jActual.Base = base
j.Base = base
Expand Down
7 changes: 6 additions & 1 deletion queue/smoke_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ func init() {
grip.CatchError(grip.SetSender(send.MakeNative()))

lvl := grip.GetSender().Level()
lvl.Threshold = level.Emergency
lvl.Threshold = level.Alert
_ = grip.GetSender().SetLevel(lvl)

job.RegisterDefaultJobs()
Expand Down Expand Up @@ -73,6 +73,11 @@ func runUnorderedSmokeTest(ctx context.Context, q amboy.Queue, size int, assert
assert.Equal(numJobs, q.Stats().Completed, fmt.Sprintf("%+v", q.Stats()))
for result := range q.Results(ctx) {
assert.True(result.Status().Completed, fmt.Sprintf("with %d workers", size))

// assert that we had valid time info persisted
ti := result.TimeInfo()
assert.NotZero(ti.Start)
assert.NotZero(ti.End)
}

statCounter := 0
Expand Down
3 changes: 3 additions & 0 deletions registry/interchange.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ type JobInterchange struct {
Job *rawJob `json:"job,omitempty" bson:"job,omitempty" yaml:"job,omitempty"`
Dependency *DependencyInterchange `json:"dependency,omitempty" bson:"dependency,omitempty" yaml:"dependency,omitempty"`
Status amboy.JobStatusInfo `bson:"status" json:"status" yaml:"status"`
TimeInfo amboy.JobTimeInfo `bson:"time_info" json:"time_info" yaml:"time_info"`
}

// MakeJobInterchange changes a Job interface into a JobInterchange
Expand Down Expand Up @@ -45,6 +46,7 @@ func MakeJobInterchange(j amboy.Job) (*JobInterchange, error) {
Version: typeInfo.Version,
Priority: j.Priority(),
Status: j.Status(),
TimeInfo: j.TimeInfo(),
Job: &rawJob{
Body: data,
Type: typeInfo.Name,
Expand Down Expand Up @@ -88,6 +90,7 @@ func ConvertToJob(j *JobInterchange) (amboy.Job, error) {
job.SetDependency(dep)
job.SetPriority(j.Priority)
job.SetStatus(j.Status)
job.UpdateTimeInfo(j.TimeInfo)

return job, nil
}
Expand Down
19 changes: 19 additions & 0 deletions registry/interchange_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package registry

import (
"testing"
"time"

"github.com/mongodb/amboy"
"github.com/mongodb/amboy/dependency"
Expand Down Expand Up @@ -101,6 +102,24 @@ func (s *JobInterchangeSuite) TestMismatchedDependencyCausesJobConversionToError
}
}

func (s *JobInterchangeSuite) TestTimeInfoPersists() {
now := time.Now()
ti := amboy.JobTimeInfo{Start: now, End: now.Add(time.Hour), WaitUntil: now.Add(-time.Minute)}
s.job.UpdateTimeInfo(ti)
s.Equal(ti, s.job.timeInfo)

i, err := MakeJobInterchange(s.job)
if s.NoError(err) {
s.Equal(i.TimeInfo, ti)

j, err := ConvertToJob(i)
s.NoError(err)
s.NotNil(j)
s.Equal(ti, j.TimeInfo())
}

}

// DependencyInterchangeSuite tests the DependencyInterchange format
// and converters. This type provides a way for Jobs and Queues to
// serialize their objects quasi-generically.
Expand Down
8 changes: 8 additions & 0 deletions registry/mock_job_for_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ type JobTest struct {
shouldFail bool
hadError bool
status amboy.JobStatusInfo
timeInfo amboy.JobTimeInfo
T amboy.JobType
dep dependency.Manager
priority int
Expand Down Expand Up @@ -98,5 +99,12 @@ func (j *JobTest) Status() amboy.JobStatusInfo {

func (j *JobTest) SetStatus(s amboy.JobStatusInfo) {
j.status = s
}

func (j *JobTest) TimeInfo() amboy.JobTimeInfo {
return j.timeInfo
}

func (j *JobTest) UpdateTimeInfo(i amboy.JobTimeInfo) {
j.timeInfo = i
}

0 comments on commit f3377ea

Please sign in to comment.