Skip to content

Commit

Permalink
fix more merge bugs
Browse files Browse the repository at this point in the history
  • Loading branch information
gfr10598 committed May 30, 2018
1 parent 8c26275 commit 60e69e7
Show file tree
Hide file tree
Showing 6 changed files with 33 additions and 32 deletions.
3 changes: 3 additions & 0 deletions dispatch/deduphandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,9 @@ ErrorTimeout:
case err == nil:
// Restart the timer whenever Metadata succeeds.
errorDeadline = time.Now().Add(2 * time.Minute)
if os.Getenv("UNIT_TEST_MODE") != "" {
errorDeadline = time.Now().Add(time.Second)
}
if meta.StreamingBuffer == nil {
// Buffer is empty, so we can move on.
return nil
Expand Down
14 changes: 5 additions & 9 deletions dispatch/dispatch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@ func (s *S) SaveTask(t state.Task) error {
return nil
}

func (s *S) DeleteTask(t state.Task) error { return nil }
func (s *S) DeleteTask(t state.Task) error { return nil }
func (s *S) SaveSystem(ss *state.SystemState) error { return nil }

func assertSaver() { func(ex state.Saver) {}(&S{}) }

Expand All @@ -48,11 +49,6 @@ func xTestSaver(t *testing.T) {
}
}

func (s *S) DeleteTask(t state.Task) error { return nil }
func (s *S) SaveSystem(ss *state.SystemState) error { return nil }

func assertSaver() { func(ex state.Saver) {}(&S{}) }

func TestDispatcherLifeCycle(t *testing.T) {
os.Setenv("GCLOUD_PROJECT", "mlab-testing")
os.Setenv("UNIT_TEST_MODE", "true")
Expand Down Expand Up @@ -93,7 +89,7 @@ func TestDispatcherLifeCycle(t *testing.T) {
t.Fatal("Task not saved")
}
// For now, state should have progressed to Stabilizing.
if len(taskStates) != 6 {
if len(taskStates) != 7 {
t.Fatal("Wrong number of states:", len(taskStates))
}

Expand All @@ -109,7 +105,7 @@ func TestDispatcherLifeCycle(t *testing.T) {
t.Errorf("Wrong state %+v\n", taskStates[3])
}

if taskStates[5].State != state.Done {
t.Errorf("Wrong state %+v\n", taskStates[5])
if taskStates[6].State != state.Done {
t.Errorf("Wrong state %+v\n", taskStates[6])
}
}
36 changes: 19 additions & 17 deletions rex/rex.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,14 +103,14 @@ func (rex *ReprocessingExecutor) DoAction(t *state.Task, terminate <-chan struct
qh, err := tq.NewQueueHandler(rex.Client, rex.Project, t.Queue)
if err != nil {
metrics.FailCount.WithLabelValues("NewDataset")
t.Err = err
t.SetError(err, "NewDataset")
return
}
log.Println("Wait for empty queue ", qh.Queue)
for err := qh.IsEmpty(); err != nil; err = qh.IsEmpty() {
select {
case <-terminate:
t.Err = ErrTaskSuspended
t.SetError(ErrTaskSuspended, "Terminating")
return
default:
}
Expand All @@ -129,8 +129,9 @@ func (rex *ReprocessingExecutor) DoAction(t *state.Task, terminate <-chan struct
}
case state.Stabilizing:
// Wait for the streaming buffer to be nil.
t.Err = dispatch.WaitForStableTable(nil)
if t.Err != nil {
err := dispatch.WaitForStableTable(nil)
if err != nil {
t.SetError(err, "WaitForStableTable")
}
case state.Deduplicating:
rex.dedup(t, terminate)
Expand All @@ -146,32 +147,33 @@ func (rex *ReprocessingExecutor) queue(t *state.Task, terminate <-chan struct{})
//func (qh *ChannelQueueHandler) handleLoop(next api.BasicPipe, bucketOpts ...option.ClientOption) {
qh, err := tq.NewQueueHandler(rex.Client, rex.Project, t.Queue)
if err != nil {
// TODO - move all these metrics into SetError?
metrics.FailCount.WithLabelValues("NewDataset")
t.Err = err
t.SetError(err, "NewDataset")
return
}
parts, err := t.ParsePrefix()
if err != nil {
// If there is a parse error, log and skip request.
log.Println(err)
metrics.FailCount.WithLabelValues("BadPrefix").Inc()
t.Err = err
t.SetError(err, "BadPrefix")
return
}
bucketName := parts[1]
bucket, err := tq.GetBucket(rex.BucketOpts, rex.Project, bucketName, false)
if err != nil {
log.Println(err)
metrics.FailCount.WithLabelValues("BucketError").Inc()
t.Err = err
t.SetError(err, "BucketError")
return
}
// TODO maybe check terminate while queuing?
n, err := qh.PostDay(bucket, bucketName, parts[2]+"/"+parts[3]+"/")
if err != nil {
log.Println(err)
metrics.FailCount.WithLabelValues("PostDayError").Inc()
t.Err = err
t.SetError(err, "PostDayError")
return
}
log.Println("Added ", n, t.Name, " tasks to ", qh.Queue)
Expand All @@ -182,13 +184,13 @@ func (rex *ReprocessingExecutor) dedup(t *state.Task, terminate <-chan struct{})
ds, err := rex.GetDS()
if err != nil {
metrics.FailCount.WithLabelValues("NewDataset")
t.Err = err
t.SetError(err, "NewDataset")
return
}
src, dest, err := t.SourceAndDest(&ds)
if err != nil {
metrics.FailCount.WithLabelValues("BadDedupPrefix")
t.Err = err
t.SetError(err, "BadDedupPrefix")
return
}

Expand All @@ -201,7 +203,7 @@ func (rex *ReprocessingExecutor) dedup(t *state.Task, terminate <-chan struct{})
} else {
log.Println(err, src.FullyQualifiedName())
metrics.FailCount.WithLabelValues("DedupFailed")
t.Err = err
t.SetError(err, "DedupFailed")
return
}
}
Expand Down Expand Up @@ -248,19 +250,19 @@ func (rex *ReprocessingExecutor) finish(t *state.Task, terminate <-chan struct{}
ds, err := bqext.NewDataset(rex.Project, rex.Dataset, rex.Options...)
if err != nil {
metrics.FailCount.WithLabelValues("NewDataset")
t.Err = err
t.SetError(err, "NewDataset")
return
}
src, _, err := t.SourceAndDest(&ds)
if err != nil {
metrics.FailCount.WithLabelValues("BadDedupPrefix")
t.Err = err
metrics.FailCount.WithLabelValues("SourceAndDest")
t.SetError(err, "SourceAndDest")
return
}
job, err := ds.BqClient.JobFromID(context.Background(), t.JobID)
if err != nil {
metrics.FailCount.WithLabelValues("JobFromID")
t.Err = err
t.SetError(err, "JobFromID")
return
}
// TODO - should loop, and check terminate channel
Expand All @@ -270,7 +272,7 @@ func (rex *ReprocessingExecutor) finish(t *state.Task, terminate <-chan struct{}
if err != ErrTaskSuspended {
log.Println(status.Err(), src.FullyQualifiedName())
metrics.FailCount.WithLabelValues("DedupError")
t.Err = err
t.SetError(err, "DedupError")
}
return
}
Expand All @@ -287,8 +289,8 @@ func (rex *ReprocessingExecutor) finish(t *state.Task, terminate <-chan struct{}
}
if ctx.Err() != nil {
if ctx.Err() != context.DeadlineExceeded {
t.Err = ctx.Err()
metrics.FailCount.WithLabelValues("TableDeleteTimeout")
t.SetError(ctx.Err(), "TableDeleteTimeout")
log.Println(ctx.Err())
return
}
Expand Down
8 changes: 4 additions & 4 deletions state/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,16 +163,16 @@ func (task Task) Process(h Helpers) {
break
}

h.ex.AdvanceState(&t)
h.ex.AdvanceState(&task)
select {
case <-h.terminate:
task.err = ErrTaskSuspended
default:
}
}
log.Printf("%+v\n", t)
h.updater <- t // Tell the dispatcher what we are doing (async)
h.saver.SaveTask(t)
log.Printf("%+v\n", task)
h.updater <- task // Tell the dispatcher what we are doing (async)
h.saver.SaveTask(task)
}

// SystemState holds the high level state of the reprocessing dispatcher.
Expand Down
2 changes: 0 additions & 2 deletions state/state_bb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,8 +83,6 @@ func TestBasic(t *testing.T) {
}
go ss.DoDispatchLoop("Fake", []string{"ndt"}, time.Now().Add(-10*24*time.Hour))

state.GetQueueChan(ss) <- "Q1"
state.GetQueueChan(ss) <- "Q2"
time.Sleep(20 * time.Millisecond)
ss.Terminate()
ss.WaitForTerminate()
Expand Down
2 changes: 2 additions & 0 deletions state/state_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ func (s *testSaver) DeleteTask(t state.Task) error {
return nil
}

func (s *testSaver) SaveSystem(ss *state.SystemState) error { return nil }

func assertSaver() { func(ex state.Saver) {}(&testSaver{}) }

func TestTaskBasics(t *testing.T) {
Expand Down

0 comments on commit 60e69e7

Please sign in to comment.