Skip to content

Commit

Permalink
Merge 70a865d into 46258a9
Browse files Browse the repository at this point in the history
  • Loading branch information
gfr10598 committed Jul 16, 2018
2 parents 46258a9 + 70a865d commit ddb4dd0
Show file tree
Hide file tree
Showing 7 changed files with 193 additions and 62 deletions.
4 changes: 2 additions & 2 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ before_script:

script:
# To start, run all the non-integration tests.
- MODULES="cloud/tq cmd/gardener dedup dispatch reproc state"
- MODULES="dedup dispatch cloud/tq cmd/gardener reproc state"
- for module in $MODULES; do
COVER_PKGS=${COVER_PKGS}./$module/..., ;
done
Expand All @@ -86,7 +86,7 @@ script:
# Note: we do not run integration tests from forked PRs b/c the SA is unavailable.
# Note that for modules in subdirectories, this replaces separating slashes with _.
- if [[ -n "$SERVICE_ACCOUNT_mlab_testing" ]] ; then
for module in dedup dispatch cloud/tq cmd/gardener reproc; do
for module in dedup dispatch cloud/tq cmd/gardener reproc state; do
go test -v -coverpkg=$COVER_PKGS -coverprofile=${module//\//_}.cov github.com/m-lab/etl-gardener/$module -tags=integration ;
EC=$[ $EC || $? ] ;
done ;
Expand Down
3 changes: 3 additions & 0 deletions dispatch/dispatch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,9 @@ func TestSaver(t *testing.T) {
if err != nil {
t.Fatal(err)
}

// Cleanup.
task.Delete()
}

func TestDispatcherLifeCycle(t *testing.T) {
Expand Down
47 changes: 33 additions & 14 deletions reproc/dispatch.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ package reproc

import (
"errors"
"log"
"sync"

"github.com/m-lab/etl-gardener/state"
Expand Down Expand Up @@ -61,46 +62,64 @@ func NewTerminator() *Terminator {
// It is responsible for starting tasks, recycling queues, and handling the
// termination signal.
type TaskHandler struct {
taskQueues chan string // Channel through which queues recycled.
exec state.Executor // Executor passed to new tasks
taskQueues chan string // Channel through which queues recycled.
saver state.Saver // The Saver used to save task states.

// For managing termination.
*Terminator
}

// NewTaskHandler creates a new TaskHandler.
func NewTaskHandler(queues []string) *TaskHandler {
func NewTaskHandler(exec state.Executor, queues []string, saver state.Saver) *TaskHandler {
// Create taskQueue channel, and preload with queues.
taskQueues := make(chan string, len(queues))
for _, q := range queues {
taskQueues <- q
}

return &TaskHandler{taskQueues, NewTerminator()}
return &TaskHandler{exec, taskQueues, saver, NewTerminator()}
}

// ErrTerminating is returned e.g. by AddTask, when tracker is terminating.
var ErrTerminating = errors.New("TaskHandler is terminating")

// StartTask starts a single task. It should be properly initialized except for saver.
func (th *TaskHandler) StartTask(t state.Task) {
t.SetSaver(th.saver)

// WARNING: There is a race here when terminating, if a task gets
// a queue here and calls Add(). This races with the thread that started
// the termination and calls Wait().
th.Add(1)
// We pass a function to Process that it should call when finished
// using the queue (when queue has drained. Since this runs in its own
// go routine, we need to avoid closing the taskQueues channel, which
// could then cause panics.
doneWithQueue := func() {
log.Println("Returning", t.Queue)
th.taskQueues <- t.Queue
}
go t.Process(th.exec, doneWithQueue, th.Terminator)
}

// AddTask adds a new task, blocking until the task has been accepted.
// This will typically be repeated called by another goroutine responsible
// for driving the reprocessing.
// May return ErrTerminating, if th has started termination.
// TODO: Add prometheus metrics.
func (th *TaskHandler) AddTask(prefix string) error {
log.Println("Waiting for a queue")
select {
// Wait until there is an available task queue.
case queue := <-th.taskQueues:
t := state.Task{Name: prefix, Queue: queue, State: state.Initializing}

// WARNING: There is a race here when terminating, if a task gets
// a queue here and calls Add(). This races with the thread that started
// the termination and calls Wait().
th.Add(1)
// We are passing taskQueues to Process, so that it can recycle
// its taskQueue when it is empty. Since this runs in its own
// go routine, we need to avoid closing the taskQueues channel, which
// could then cause panics.
go t.Process(th.taskQueues, th.Terminator)
log.Println("Got a queue", queue)
t, err := state.NewTask(prefix, queue, nil)
if err != nil {
log.Println(err)
return err
}
th.StartTask(*t)
return nil

// Or until we start termination.
Expand Down
49 changes: 41 additions & 8 deletions reproc/dispatch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,13 +42,46 @@ func TestTerminator(t *testing.T) {
trm.Wait()
}

type Exec struct{}

func (ex *Exec) Next(t *state.Task, terminate <-chan struct{}) {
log.Println("Do", t)
time.Sleep(time.Duration(1+rand.Intn(2)) * time.Millisecond)

switch t.State {
case state.Invalid:
t.Update(state.Initializing)
case state.Initializing:
t.Update(state.Queuing)
case state.Queuing:
t.Update(state.Processing)
case state.Processing:
t.Queue = "" // No longer need to keep the queue.
t.Update(state.Stabilizing)
case state.Stabilizing:
t.Update(state.Deduplicating)
case state.Deduplicating:
t.Update(state.Finishing)
case state.Finishing:
t.Update(state.Done)
case state.Done:
// Generally shouldn't happen.
// In prod, we would ignore this, but for test we log.Fatal to force
// a test failure.
log.Fatal("Should not call Next when state is Done")
}
}

func AssertExecutor() { func(ex state.Executor) {}(&Exec{}) }

// This test exercises the task management, including invoking t.Process().
// It does not check any state, but if the termination does not work properly,
// may fail to complete. Also, running with -race may detect race
// conditions.
func TestBasic(t *testing.T) {
// Start tracker with no queues.
th := reproc.NewTaskHandler([]string{})
exec := Exec{}
th := reproc.NewTaskHandler(&exec, []string{}, nil)

// This will block because there are no queues.
go th.AddTask("foobar")
Expand All @@ -60,17 +93,17 @@ func TestBasic(t *testing.T) {
}

// This test exercises the task management, including invoking t.Process().
// It does not check any state, but if the termination does not work properly,
// It does not check any state, but if the termination does not work properly,
// may fail to complete. Also, running with -race may detect race
// conditions.
func TestWithTaskQueue(t *testing.T) {
// Start tracker with no queues.
th := reproc.NewTaskHandler([]string{"queue-1"})

th.AddTask("a")
// Start tracker with one queue.
exec := Exec{}
th := reproc.NewTaskHandler(&exec, []string{"queue-1"}, nil)
th.AddTask("gs://fake/ndt/2017/09/22/")

go th.AddTask("b")
go th.AddTask("c")
go th.AddTask("gs://fake/ndt/2017/09/24/")
go th.AddTask("gs://fake/ndt/2017/09/26/")

time.Sleep(15 * time.Millisecond)
th.Terminate()
Expand Down
52 changes: 34 additions & 18 deletions state/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,14 @@ import (
"fmt"
"io"
"log"
"math/rand"
"regexp"
"strings"
"time"

"cloud.google.com/go/bigquery"
"cloud.google.com/go/datastore"
"github.com/m-lab/etl-gardener/metrics"
"github.com/m-lab/go/bqext"
)

// State indicates the state of a single Task in flight.
Expand Down Expand Up @@ -46,16 +49,14 @@ var StateNames = map[State]string{
var (
ErrInvalidQueue = errors.New("invalid queue")
ErrTaskSuspended = errors.New("task suspended")
ErrTableNotFound = errors.New("Not found: Table")
)

// Executor describes an object that can do all the required steps to execute a Task.
// Used for mocking.
// Interface must update the task in place, so that state changes are all visible.
type Executor interface {
// Advance to the next state.
AdvanceState(task *Task)
// Advance to the next state.
DoAction(task *Task, terminate <-chan struct{})
Next(task *Task, terminate <-chan struct{})
}

// Saver provides API for saving Task state.
Expand Down Expand Up @@ -164,6 +165,21 @@ func (t *Task) ParsePrefix() ([]string, error) {
return fields, nil
}

// SourceAndDest creates BQ Table entities for the source templated table, and destination partition.
func (t *Task) SourceAndDest(ds *bqext.Dataset) (*bigquery.Table, *bigquery.Table, error) {
// Launch the dedup request, and save the JobID
parts, err := t.ParsePrefix()
if err != nil {
// If there is a parse error, log and skip request.
metrics.FailCount.WithLabelValues("BadDedupPrefix")
return nil, nil, err
}

src := ds.Table(parts[2] + "_" + strings.Join(strings.Split(parts[3], "/"), ""))
dest := ds.Table(parts[2] + "$" + strings.Join(strings.Split(parts[3], "/"), ""))
return src, dest, nil
}

func (t Task) String() string {
return fmt.Sprintf("{%s: %s, Q:%s, J:%s, E:%s (%s)}", t.Name, StateNames[t.State], t.Queue, t.JobID, t.ErrMsg, t.ErrInfo)
}
Expand All @@ -182,11 +198,11 @@ func (t *Task) Save() error {

// Update updates the task state, and saves to the "saver".
func (t *Task) Update(st State) error {
t.State = st
t.UpdateTime = time.Now()
if t.saver == nil {
return ErrNoSaver
}
t.State = st
t.UpdateTime = time.Now()
return t.saver.SaveTask(*t)
}

Expand Down Expand Up @@ -229,20 +245,20 @@ type Terminator interface {
func nop() {}

// Process handles all steps of processing a task.
// TODO: Real implementation. This is a dummy implementation, to support testing TaskHandler.
func (t *Task) Process(tq chan<- string, term Terminator) {
select {
case <-time.After(time.Duration(1+rand.Intn(10)) * time.Millisecond):
tq <- t.Queue
// Wait until one of these...
func (t Task) Process(ex Executor, doneWithQueue func(), term Terminator) {
log.Println("Starting:", t.Name)
loop:
for t.State != Done { //&& t.err == nil {
select {
case <-time.After(time.Duration(1+rand.Intn(10)) * time.Millisecond):
nop()
case <-term.GetNotifyChannel():
nop()
t.SetError(ErrTaskSuspended, "Terminating")
break loop
default:
ex.Next(&t, term.GetNotifyChannel())
if t.State == Stabilizing {
doneWithQueue()
}
}
case <-term.GetNotifyChannel():
nop()
}
term.Done()
}
Expand Down
54 changes: 35 additions & 19 deletions state/state_bb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,33 @@ import (
"github.com/m-lab/etl-gardener/state"
)

func waitForNTasks(t *testing.T, saver *state.DatastoreSaver, expectedTaskCount int) []state.Task {
var tasks []state.Task
var err error
for i := 0; i < 10; i++ {
// Real datastore takes about 100 msec or more before consistency.
// In travis, we use the emulator, which should provide consistency
// much more quickly. We use a modest number here that usually
// is sufficient for running on workstation, and rarely fail with emulator.
// Then we retry up to 10 times, before actually failing.
time.Sleep(200 * time.Millisecond)
ctx := context.Background()
ctx, cancel := context.WithTimeout(ctx, 2*time.Second)
defer cancel()
tasks, err = saver.GetStatus(ctx)
if err != nil {
t.Fatal(err)
}
if ctx.Err() != nil {
t.Fatal(ctx.Err())
}
if len(tasks) == expectedTaskCount {
break
}
}
return tasks
}

func TestStatus(t *testing.T) {
saver, err := state.NewDatastoreSaver("mlab-testing")
if err != nil {
Expand All @@ -28,30 +55,17 @@ func TestStatus(t *testing.T) {
if err != nil {
t.Fatal(err)
}
task.Name = "task2"
task.Name = "gs://foo/bar/2000/01/01/task2"
task.Queue = "Q2"
err = task.Update(state.Queuing)
if err != nil {
t.Fatal(err)
}

// Real datastore takes about 100 msec or more before consistency.
// In travis, we use the emulator, which should provide consistency
// much more quickly. So we use a modest number here that usually
// is sufficient for running on workstation.
time.Sleep(200 * time.Millisecond)
ctx := context.Background()
ctx, cancel := context.WithTimeout(ctx, 2*time.Second)
defer cancel()
tasks, err := saver.GetStatus(ctx)
if err != nil {
t.Fatal(err)
}
if ctx.Err() != nil {
t.Fatal(ctx.Err())
}
if len(tasks) != 2 {
t.Error("Should be 2 tasks (see notes on consistency", len(tasks))
ExpectedTasks := 2
tasks := waitForNTasks(t, saver, ExpectedTasks)
if len(tasks) != ExpectedTasks {
t.Errorf("Saw %d tasks instead of %d (see notes on consistency)", len(tasks), ExpectedTasks)
for _, t := range tasks {
log.Println(t)
}
Expand Down Expand Up @@ -80,7 +94,9 @@ func TestWriteStatus(t *testing.T) {
task.Queue = "Q2"
task.Update(state.Queuing)
t2 := task
time.Sleep(200 * time.Millisecond)

ExpectedTasks := 2
waitForNTasks(t, saver, ExpectedTasks)

bb := make([]byte, 0, 500)
buf := bytes.NewBuffer(bb)
Expand Down

0 comments on commit ddb4dd0

Please sign in to comment.