Skip to content

Commit

Permalink
various small fixes, separate tracker_race_test
Browse files Browse the repository at this point in the history
  • Loading branch information
Gregory Russell committed Dec 2, 2019
1 parent 49ccd8e commit fa9dca7
Show file tree
Hide file tree
Showing 4 changed files with 90 additions and 71 deletions.
5 changes: 3 additions & 2 deletions tracker/tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ type Job struct {
}

// NewJob creates a new job object.
// NB: The date will be converted to UTC and truncated to day boundary!
func NewJob(bucket, exp, typ string, date time.Time) Job {
return Job{Bucket: bucket,
Experiment: exp,
Expand Down Expand Up @@ -87,7 +88,6 @@ func (j Status) isDone() bool {
}

// NewStatus creates a new Status with provided parameters.
// NB: The date will be converted to UTC and truncated to day boundary!
func NewStatus() Status {
return Status{
State: Init,
Expand All @@ -97,6 +97,7 @@ func NewStatus() Status {

// JobMap is defined to allow custom json marshal/unmarshal.
// It defines the map from Job to Status.
// TODO implement datastore.PropertyLoadSaver
type JobMap map[Job]Status

// MarshalJSON implements json.Marshal
Expand All @@ -116,6 +117,7 @@ func (jobs JobMap) MarshalJSON() ([]byte, error) {
}

// UnmarshalJSON implements json.UnmarshalJSON
// jobs and data should be non-nil.
func (jobs *JobMap) UnmarshalJSON(data []byte) error {
type Pair struct {
Job Job
Expand All @@ -127,7 +129,6 @@ func (jobs *JobMap) UnmarshalJSON(data []byte) error {
return err
}

//jobs = make(map[Job]*Status, len(pairs))
for i := range pairs {
(*jobs)[pairs[i].Job] = pairs[i].State
}
Expand Down
4 changes: 2 additions & 2 deletions tracker/tracker_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func TestWithDatastore(t *testing.T) {
}

log.Println("Calling Sync")
must(t, tk.Sync()) // This causes invalid entity type
must(t, tk.Sync())
// Check that the sync (and InitTracker) work.
restore, err := tracker.InitTracker(context.Background(), client, dsKey, 0)
must(t, err)
Expand All @@ -50,7 +50,7 @@ func TestWithDatastore(t *testing.T) {

completeJobs(t, tk, "500Jobs", "type", numJobs)

tk.Sync()
must(t, tk.Sync())

if tk.NumJobs() != 0 {
t.Error("Job cleanup failed", tk.NumJobs())
Expand Down
83 changes: 83 additions & 0 deletions tracker/tracker_race_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
// +build race

package tracker_test

import (
"context"
"fmt"
"log"
"math/rand"
"sync"
"testing"
"time"

"cloud.google.com/go/datastore"
"github.com/m-lab/etl-gardener/tracker"
)

func TestConcurrentUpdates(t *testing.T) {
// The test is intended to exercise job updates at a high
// rate, and ensure that are no races.
// It should be run with -race to detect any concurrency
// problems.
if testing.Short() {
t.Skip("Skipping for -short")
}

client := newTestClient()
dsKey := datastore.NameKey("TestConcurrentUpdates", "jobs", nil)
dsKey.Namespace = "gardener"
defer must(t, cleanup(client, dsKey))

// For testing, push to the saver every 5 milliseconds.
saverInterval := 5 * time.Millisecond
tk, err := tracker.InitTracker(context.Background(), client, dsKey, saverInterval)
must(t, err)

jobs := 20
createJobs(t, tk, "ConcurrentUpdates", "type", jobs)

changes := 20 * jobs
start := time.Now()
wg := sync.WaitGroup{}
wg.Add(changes)
// Execute large number of concurrent updates and heartbeats.
for i := 0; i < changes; i++ {
go func(i int) {
k := tracker.Job{"bucket", "ConcurrentUpdates", "type",
startDate.Add(time.Duration(24*rand.Intn(jobs)) * time.Hour)}
if i%5 == 0 {
err := tk.SetStatus(k, tracker.State(fmt.Sprintf("State:%d", i)))
if err != nil {
log.Fatal(err, " ", k)
}
} else {
err := tk.Heartbeat(k)
if err != nil {
log.Fatal(err, " ", k)
}
}
wg.Done()
}(i)
time.Sleep(200 * time.Microsecond)
}
wg.Wait()
elapsed := time.Since(start)
if elapsed > 2*time.Second {
t.Error("Expected elapsed time < 2 seconds", elapsed)
}

// Change to true to dump the final state.
if false {
tk.Sync()
restore, err := tracker.InitTracker(context.Background(), client, dsKey, 0)
must(t, err)

status := restore.GetAll()
for k, v := range status {
log.Println(k, v)
}

t.Fail()
}
}
69 changes: 2 additions & 67 deletions tracker/tracker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,7 @@ package tracker_test

import (
"context"
"fmt"
"log"
"math/rand"
"sync"
"testing"
"time"
Expand Down Expand Up @@ -160,7 +158,7 @@ func TestNonexistentJobAccess(t *testing.T) {
t.Error("Should be ErrJobNotFound", err)
}
js := tracker.NewJob("bucket", "exp", "type", startDate)
err = tk.AddJob(js)
must(t, tk.AddJob(js))
if err != nil {
t.Error(err)
}
Expand All @@ -170,74 +168,11 @@ func TestNonexistentJobAccess(t *testing.T) {
t.Error("Should be ErrJobAlreadyExists", err)
}

tk.SetStatus(js, tracker.Complete)
must(t, tk.SetStatus(js, tracker.Complete))

// Job should be gone now.
err = tk.SetStatus(js, "foobar")
if err != tracker.ErrJobNotFound {
t.Error("Should be ErrJobNotFound", err)
}
}

func TestConcurrentUpdates(t *testing.T) {
// The test is intended to exercise job updates at a high
// rate, and ensure that are no races.
// It should be run with -race to detect any concurrency
// problems.
client := newTestClient()
dsKey := datastore.NameKey("TestConcurrentUpdates", "jobs", nil)
dsKey.Namespace = "gardener"
defer must(t, cleanup(client, dsKey))

// For testing, push to the saver every 5 milliseconds.
saverInterval := 5 * time.Millisecond
tk, err := tracker.InitTracker(context.Background(), client, dsKey, saverInterval)
must(t, err)

jobs := 20
createJobs(t, tk, "ConcurrentUpdates", "type", jobs)

changes := 20 * jobs
start := time.Now()
wg := sync.WaitGroup{}
wg.Add(changes)
// Execute large number of concurrent updates and heartbeats.
for i := 0; i < changes; i++ {
go func(i int) {
k := tracker.Job{"bucket", "ConcurrentUpdates", "type",
startDate.Add(time.Duration(24*rand.Intn(jobs)) * time.Hour)}
if i%5 == 0 {
err := tk.SetStatus(k, tracker.State(fmt.Sprintf("State:%d", i)))
if err != nil {
log.Fatal(err, " ", k)
}
} else {
err := tk.Heartbeat(k)
if err != nil {
log.Fatal(err, " ", k)
}
}
wg.Done()
}(i)
time.Sleep(200 * time.Microsecond)
}
wg.Wait()
elapsed := time.Since(start)
if elapsed > 2*time.Second {
t.Error("Expected elapsed time < 2 seconds", elapsed)
}

// Change to true to dump the final state.
if false {
tk.Sync()
restore, err := tracker.InitTracker(context.Background(), client, dsKey, 0)
must(t, err)

status := restore.GetAll()
for k, v := range status {
log.Println(k, v)
}

t.Fail()
}
}

0 comments on commit fa9dca7

Please sign in to comment.