Skip to content

Commit

Permalink
Add dispatcher lifecycle test
Browse files Browse the repository at this point in the history
  • Loading branch information
Gregory Russell committed Mar 5, 2018
1 parent f800f78 commit 5231173
Show file tree
Hide file tree
Showing 3 changed files with 61 additions and 18 deletions.
21 changes: 7 additions & 14 deletions dispatch/dispatch.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ import (
"os"
"reflect"
"time"

"google.golang.org/api/option"
)

// DESIGN:
Expand All @@ -26,26 +28,15 @@ type Dispatcher struct {
StartDate time.Time
}

<<<<<<< HEAD
// NewDispatcher creates a proof of concept dispatcher
// hard coded to mlab-testing. For testing / proof of concept only.
// TODO - replace with functional code.
func NewDispatcher() (*Dispatcher, error) {
queues := make([]chan<- string, 0, 4)
done := make([]<-chan bool, 0, 4)
for i := 0; i < 4; i++ {
q, d, err := NewChannelQueueHandler(http.DefaultClient, "mlab-testing",
fmt.Sprintf("test-queue-%d", i))
=======
// NewDispatcher creates a dispatcher that will spread requests across multiple
// QueueHandlers.
func NewDispatcher(httpClient *http.Client, project, queueBase string, numQueues int, startDate time.Time) (*Dispatcher, error) {
// bucketOpts may be used to provide a fake client for bucket operations.
func NewDispatcher(httpClient *http.Client, project, queueBase string, numQueues int, startDate time.Time, bucketOpts ...option.ClientOption) (*Dispatcher, error) {
queues := make([]chan<- string, 0, numQueues)
done := make([]<-chan bool, 0, numQueues)
for i := 0; i < numQueues; i++ {
q, d, err := NewChannelQueueHandler(httpClient, project,
fmt.Sprintf("%s%d", queueBase, i))
>>>>>>> b63e48d... More realistic dispatcher
fmt.Sprintf("%s%d", queueBase, i), bucketOpts...)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -76,6 +67,8 @@ func (disp *Dispatcher) Add(prefix string) {
Chan: reflect.ValueOf(disp.Queues[i]), Send: reflect.ValueOf(prefix)}
cases = append(cases, c)
}
// TODO - check for panic if channels are closed?
// or just check if Kill has been called?
reflect.Select(cases)
}

Expand Down
47 changes: 47 additions & 0 deletions dispatch/dispatch_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package dispatch_test

import (
"log"
"testing"
"time"

"google.golang.org/api/option"

"github.com/m-lab/etl-gardener/cloud/tq"
"github.com/m-lab/etl-gardener/dispatch"
)

func init() {
// Always prepend the filename and line number.
log.SetFlags(log.LstdFlags | log.Lshortfile)
}

func TestDispatcherLifeCycle(t *testing.T) {
// Use a fake client so we intercept all the http ops.
client, counter := tq.DryRunQueuerClient()

// With time.Now(), this shouldn't send any requests.
// Inject fake client for bucket ops, so it doesn't hit the backends at all.
d, err := dispatch.NewDispatcher(client, "project", "queue-base-", 4, time.Now(), option.WithHTTPClient(client))
if err != nil {
t.Fatal(err)
}

d.Add("gs://foobar/ndt/2001/01/01/")

// This waits for all work to complete, then closes all channels.
d.Kill()

// Test prefix should have triggered a single task queue check, so count should be 2.
if counter.Count() != 2 {
t.Errorf("Count was %d instead of 2", counter.Count())
}

// Now channels should be closed, and new Add should panic.
defer func() {
if r := recover(); r == nil {
t.Error("Should panic")
}
}()
d.Add("gs://foobar/ndt/2001/01/01/")
}
11 changes: 7 additions & 4 deletions dispatch/queuehandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,16 @@ import (
"regexp"
"time"

"google.golang.org/api/option"

"github.com/m-lab/etl-gardener/cloud/tq"
)

// ChannelQueueHandler is an autonomous queue handler running in a go
// routine, fed by a channel.
type ChannelQueueHandler struct {
*tq.QueueHandler
// Handler listens on this channel for prefixes.
Channel chan string
}

Expand Down Expand Up @@ -43,7 +46,7 @@ func parsePrefix(prefix string) ([]string, error) {

// StartHandleLoop starts a go routine that waits for work on channel, and
// processes it. Returns a channel that will send true when input channel is closed.
func (chq *ChannelQueueHandler) StartHandleLoop() <-chan bool {
func (chq *ChannelQueueHandler) StartHandleLoop(bucketOpts ...option.ClientOption) <-chan bool {
done := make(chan bool)
go func() {
for {
Expand Down Expand Up @@ -73,7 +76,7 @@ func (chq *ChannelQueueHandler) StartHandleLoop() <-chan bool {

log.Println(parts)
bucketName := parts[1]
bucket, err := tq.GetBucket(nil, chq.Project, bucketName, false)
bucket, err := tq.GetBucket(bucketOpts, chq.Project, bucketName, false)
if err != nil {
log.Println(err)
continue
Expand All @@ -92,14 +95,14 @@ func (chq *ChannelQueueHandler) StartHandleLoop() <-chan bool {
// from a channel.
// Returns feeding channel, and done channel, which will return true when
// feeding channel is closed, and processing is complete.
func NewChannelQueueHandler(httpClient *http.Client, project, queue string) (chan<- string, <-chan bool, error) {
func NewChannelQueueHandler(httpClient *http.Client, project, queue string, bucketOpts ...option.ClientOption) (chan<- string, <-chan bool, error) {
qh, err := tq.NewQueueHandler(httpClient, project, queue)
if err != nil {
return nil, nil, err
}
ch := make(chan string)
cqh := ChannelQueueHandler{qh, ch}

done := cqh.StartHandleLoop()
done := cqh.StartHandleLoop(bucketOpts...)
return ch, done, nil
}

0 comments on commit 5231173

Please sign in to comment.