Skip to content

Commit

Permalink
MAKE-117: remove wait from queue interface
Browse files Browse the repository at this point in the history
  • Loading branch information
tychoish committed Dec 7, 2016
1 parent fb04937 commit 3c6a620
Show file tree
Hide file tree
Showing 21 changed files with 115 additions and 111 deletions.
2 changes: 1 addition & 1 deletion doc.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ Consider the following example:
// handle error case
}
queue.Wait() // waits for all tasks to finish.
Wait(queue) // waits for all tasks to finish.
queue.Close() // waits for all tasks to finish and releases
// all resources.
*/
Expand Down
5 changes: 1 addition & 4 deletions interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ type Queue interface {
Get(string) (Job, bool)

// Returns the next job in the queue. These calls are
// non-blocking and return errors
// blocking, but may be interrupted with a canceled context.
Next(context.Context) Job

// Makes it possible to detect if a Queue has started
Expand Down Expand Up @@ -104,9 +104,6 @@ type Queue interface {
// Begins the execution of the job Queue, using the embedded
// Runner.
Start(context.Context) error

// Waits for all jobs to complete.
Wait()
}

// Runner describes a simple worker interface for executing jobs in
Expand Down
4 changes: 2 additions & 2 deletions pool/group_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,8 +160,8 @@ func (s *UnorderedGroupSuite) TestPoolStartsAndProcessesJobs() {
s.True(queueTwo.Started())

s.True(s.pool.Started())
queueTwo.Wait()
queueOne.Wait()
amboy.Wait(queueTwo)
amboy.Wait(queueOne)

for _, job := range jobsOne {
s.True(job.Completed(), fmt.Sprintf("%T\n\t%+v", job, job))
Expand Down
2 changes: 1 addition & 1 deletion pool/local_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ func (s *LocalWorkersSuite) TestPoolStartsAndProcessesJobs() {
s.True(s.pool.Started())
s.True(s.queue.Started())

s.queue.Wait()
amboy.Wait(s.queue)

counter := 0
for j := range s.queue.Results() {
Expand Down
11 changes: 0 additions & 11 deletions pool/mock_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (

"github.com/mongodb/amboy"
"github.com/pkg/errors"
"github.com/tychoish/grip"
"golang.org/x/net/context"
)

Expand Down Expand Up @@ -130,16 +129,6 @@ func (q *QueueTester) Results() <-chan amboy.Job {
return output
}

func (q *QueueTester) Wait() {
for {
grip.Debugf("test queue status: %+v\n", q.Stats())

if q.isComplete() {
return
}
}
}

func (q *QueueTester) isComplete() bool {
q.mutex.RLock()
defer q.mutex.RUnlock()
Expand Down
2 changes: 1 addition & 1 deletion pool/single_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ func (s *SingleRunnerSuite) TestPoolStartsAndProcessesJobs() {
s.True(s.pool.Started())
s.True(s.queue.Started())

s.queue.Wait()
amboy.Wait(s.queue)

for _, job := range jobs {
s.True(job.Completed())
Expand Down
11 changes: 0 additions & 11 deletions queue/fixed.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,14 +170,3 @@ func (q *LocalLimitedSize) Start(ctx context.Context) error {
grip.Info("job server running")
return nil
}

// Wait blocks until all tasks are complete.
func (q *LocalLimitedSize) Wait() {
for {
stats := q.Stats()
grip.Debugf("%d jobs complete of %d total", stats.Completed, stats.Total)
if stats.Total == stats.Completed {
break
}
}
}
2 changes: 1 addition & 1 deletion queue/fixed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ func (s *LimitedSizeQueueSuite) TestGetMethodOnlyReturnsCompletedJobs() {
jobs[j.ID()] = j
}

s.queue.Wait()
amboy.Wait(s.queue)
s.queue.Runner().Close()

for name, j := range jobs {
Expand Down
13 changes: 0 additions & 13 deletions queue/ordered.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ import (
"github.com/mongodb/amboy/pool"
"github.com/pkg/errors"
"github.com/tychoish/grip"
"github.com/tychoish/grip/sometimes"
"golang.org/x/net/context"
)

Expand Down Expand Up @@ -343,15 +342,3 @@ func (q *LocalOrdered) Complete(ctx context.Context, j amboy.Job) {
defer q.mutex.Unlock()
q.tasks.completed[j.ID()] = true
}

// Wait blocks until all pending jobs in the queue are complete.
func (q *LocalOrdered) Wait() {
for {
stats := q.Stats()
grip.DebugWhenf(sometimes.Fifth(),
"waiting for %d pending jobs (total=%d)", stats.Pending, stats.Total)
if stats.Pending == 0 {
break
}
}
}
7 changes: 4 additions & 3 deletions queue/ordered_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package queue
import (
"testing"

"github.com/mongodb/amboy"
"github.com/mongodb/amboy/dependency"
"github.com/mongodb/amboy/job"
"github.com/mongodb/amboy/pool"
Expand Down Expand Up @@ -112,7 +113,7 @@ func (s *OrderedQueueSuite) TestResultsChannelProducesPointersToConsistentJobObj
s.NoError(s.queue.Put(job))
s.NoError(s.queue.Start(ctx))

s.queue.Wait()
amboy.Wait(s.queue)

result, ok := <-s.queue.Results()
s.True(ok)
Expand All @@ -128,7 +129,7 @@ func (s *OrderedQueueSuite) TestQueueCanOnlyBeStartedOnce() {
s.NoError(s.queue.Start(ctx))
s.True(s.queue.Started())

s.queue.Wait()
amboy.Wait(s.queue)
s.True(s.queue.Started())

// you can call start more than once until the queue has
Expand Down Expand Up @@ -210,7 +211,7 @@ func (s *OrderedQueueSuite) TestPassedIsCompletedButDoesNotRun() {
defer cancel()

s.NoError(s.queue.Start(ctx))
s.queue.Wait()
amboy.Wait(s.queue)
s.False(j1.Completed())
s.True(j2.Completed())
}
13 changes: 0 additions & 13 deletions queue/priority.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"github.com/mongodb/amboy/pool"
"github.com/mongodb/amboy/queue/driver"
"github.com/tychoish/grip"
"github.com/tychoish/grip/sometimes"
"golang.org/x/net/context"
)

Expand Down Expand Up @@ -161,15 +160,3 @@ func (q *LocalPriorityQueue) Start(ctx context.Context) error {

return nil
}

// Wait blocks until there is no pending work remaining in the queue.
func (q *LocalPriorityQueue) Wait() {
for {
stats := q.Stats()
grip.DebugWhenf(sometimes.Fifth(),
"%d jobs complete of %d total", stats.Completed, stats.Total)
if stats.Total == stats.Completed {
break
}
}
}
11 changes: 0 additions & 11 deletions queue/remote.go
Original file line number Diff line number Diff line change
Expand Up @@ -270,14 +270,3 @@ func (q *RemoteUnordered) Start(ctx context.Context) error {

return nil
}

// Wait blocks until there are no pending jobs in the queue.
func (q *RemoteUnordered) Wait() {
for {
stats := q.Stats()
if stats.Total == stats.Completed {
break
}
time.Sleep(50 * time.Millisecond)
}
}
13 changes: 0 additions & 13 deletions queue/shuffled.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"github.com/mongodb/amboy"
"github.com/pkg/errors"
"github.com/tychoish/grip"
"github.com/tychoish/grip/sometimes"
"golang.org/x/net/context"
)

Expand Down Expand Up @@ -261,15 +260,3 @@ func (q *LocalShuffled) SetRunner(r amboy.Runner) error {
func (q *LocalShuffled) Runner() amboy.Runner {
return q.runner
}

// Wait blocks until all submitted jobs are complete.
func (q *LocalShuffled) Wait() {
for {
stats := q.Stats()
grip.DebugWhenf(sometimes.Fifth(),
"%d jobs complete of %d total", stats.Completed, stats.Total)
if stats.Total == stats.Completed {
break
}
}
}
2 changes: 1 addition & 1 deletion queue/shuffled_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ func (s *ShuffledQueueSuite) TestGetMethodRetrieves() {
jReturn, ok = s.queue.Get(j.ID())
s.True(ok)
s.Exactly(jReturn, j)
s.queue.Wait()
amboy.Wait(s.queue)

jReturn, ok = s.queue.Get(j.ID())
s.True(ok)
Expand Down
6 changes: 3 additions & 3 deletions queue/smoke_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ func runUnorderedSmokeTest(ctx context.Context, q amboy.Queue, size int, assert
wg.Wait()

assert.Equal(numJobs, q.Stats().Total, fmt.Sprintf("with %d workers", size))
q.Wait()
amboy.Wait(q)

grip.Infof("workers complete for %d worker smoke test", size)
assert.Equal(numJobs, q.Stats().Completed, fmt.Sprintf("%+v", q.Stats()))
Expand Down Expand Up @@ -105,8 +105,8 @@ func runMultiQueueSingleBackEndSmokeTest(ctx context.Context, qOne, qTwo amboy.Q
grip.Infof("before wait statsTwo: %+v", statsTwo)

// wait for all jobs to complete.
qOne.Wait()
qTwo.Wait()
amboy.Wait(qOne)
amboy.Wait(qTwo)

grip.Infof("after wait statsOne: %+v", qOne.Stats())
grip.Infof("after wait statsTwo: %+v", qTwo.Stats())
Expand Down
13 changes: 0 additions & 13 deletions queue/unordered.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (
"github.com/mongodb/amboy/pool"
"github.com/pkg/errors"
"github.com/tychoish/grip"
"github.com/tychoish/grip/sometimes"
"golang.org/x/net/context"
)

Expand Down Expand Up @@ -211,15 +210,3 @@ func (q *LocalUnordered) Complete(ctx context.Context, j amboy.Job) {
q.numCompleted++
}()
}

// Wait blocks until all currently pending jobs jobs have completed.
func (q *LocalUnordered) Wait() {
for {
stats := q.Stats()
grip.DebugWhenf(sometimes.Fifth(),
"waiting for %d pending jobs (total=%d)", stats.Pending, stats.Total)
if stats.Pending == 0 {
break
}
}
}
7 changes: 4 additions & 3 deletions queue/unordered_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package queue
import (
"testing"

"github.com/mongodb/amboy"
"github.com/mongodb/amboy/job"
"github.com/mongodb/amboy/pool"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -93,7 +94,7 @@ func (s *LocalQueueSuite) TestResultsChannelProducesPointersToConsistentJobObjec
s.NoError(s.queue.Start(ctx))
s.NoError(s.queue.Put(job))

s.queue.Wait()
amboy.Wait(s.queue)

result, ok := <-s.queue.Results()
s.True(ok)
Expand All @@ -116,7 +117,7 @@ func (s *LocalQueueSuite) TestJobsChannelProducesJobObjects() {
s.NoError(s.queue.Put(job))
}

s.queue.Wait()
amboy.Wait(s.queue)

for j := range s.queue.Results() {
shellJob, ok := j.(*job.ShellJob)
Expand Down Expand Up @@ -156,7 +157,7 @@ func (s *LocalQueueSuite) TestQueueCanOnlyBeStartedOnce() {
s.NoError(s.queue.Start(ctx))
s.True(s.queue.Started())

s.queue.Wait()
amboy.Wait(s.queue)
s.True(s.queue.Started())

// you can call start more than once until the queue has
Expand Down
11 changes: 6 additions & 5 deletions rest/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"testing"
"time"

"github.com/mongodb/amboy"
"github.com/mongodb/amboy/job"
"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"
Expand Down Expand Up @@ -324,7 +325,7 @@ func (s *ClientSuite) TestGetStatsHelperWithActualJob() {
j := job.NewShellJob("true", "")

s.NoError(s.service.queue.Put(j))
s.service.queue.Wait()
amboy.Wait(s.service.queue)

st, err := s.client.getStats(ctx)
s.NoError(err, fmt.Sprintf("%+v", st))
Expand Down Expand Up @@ -377,7 +378,7 @@ func (s *ClientSuite) TestJobStatusWithValidJob() {

j := job.NewShellJob("echo foo", "")
s.NoError(s.service.queue.Put(j))
s.service.queue.Wait()
amboy.Wait(s.service.queue)
st, err := s.client.jobStatus(ctx, j.ID())
s.NoError(err)
s.Equal(j.ID(), st.ID)
Expand Down Expand Up @@ -480,7 +481,7 @@ func (s *ClientSuite) TestPendingJobsWithCanceledContextReturnsError() {
}

func (s *ClientSuite) TestPendingJobsIsZeroAfterWaitingOnTheQueue() {
s.service.queue.Wait()
amboy.Wait(s.service.queue)
var err error
ctx := context.Background()
s.client, err = NewClient(s.info.host, s.info.port, "")
Expand All @@ -506,7 +507,7 @@ func (s *ClientSuite) TestJobCompleteWithCanceledContextReturnsError() {
}

func (s *ClientSuite) TestJobCompleteIsZeroAfterWaitingOnTheQueue() {
s.service.queue.Wait()
amboy.Wait(s.service.queue)
var err error
ctx := context.Background()
s.client, err = NewClient(s.info.host, s.info.port, "")
Expand All @@ -515,7 +516,7 @@ func (s *ClientSuite) TestJobCompleteIsZeroAfterWaitingOnTheQueue() {
j := job.NewShellJob("echo foo", "")
s.NoError(s.service.queue.Put(j))

s.service.queue.Wait()
amboy.Wait(s.service.queue)

isComplete, err := s.client.JobComplete(ctx, j.ID())
s.True(isComplete)
Expand Down
3 changes: 2 additions & 1 deletion rest/job_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"net/http/httptest"
"testing"

"github.com/mongodb/amboy"
"github.com/mongodb/amboy/job"
"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"
Expand Down Expand Up @@ -59,7 +60,7 @@ func (s *JobStatusSuite) TestIncorrectOrInvalidJobNamesReturnExpectedResults() {
}

func (s *JobStatusSuite) TestJobNameReturnsSuccessfulResponse() {
s.service.queue.Wait()
amboy.Wait(s.service.queue)

resp, err := s.service.getJobStatusResponse(s.jobName)
s.NoError(err)
Expand Down
Loading

0 comments on commit 3c6a620

Please sign in to comment.