Skip to content

Commit

Permalink
Merge 8d2dbbc into 87a3bb0
Browse files Browse the repository at this point in the history
  • Loading branch information
stephen-soltesz committed Jan 14, 2022
2 parents 87a3bb0 + 8d2dbbc commit 025909c
Show file tree
Hide file tree
Showing 5 changed files with 136 additions and 159 deletions.
9 changes: 4 additions & 5 deletions cmd/gardener/gardener.go
Original file line number Diff line number Diff line change
Expand Up @@ -331,7 +331,7 @@ func mustStandardTracker() *tracker.Tracker {
return tk
}

func mustCreateJobService(ctx context.Context, mux *http.ServeMux) {
func mustCreateJobService(ctx context.Context) *job.Service {
storageClient, err := storage.NewClient(ctx)
rtx.Must(err, "Could not create storage client for job service")

Expand All @@ -341,7 +341,7 @@ func mustCreateJobService(ctx context.Context, mux *http.ServeMux) {
os.Getenv("PROJECT"), config.Sources(), saver,
stiface.AdaptClient(storageClient))
rtx.Must(err, "Could not initialize job service")
mux.HandleFunc("/job", svc.JobHandler)
return svc
}

// ###############################################################################
Expand Down Expand Up @@ -410,11 +410,10 @@ func main() {
rtx.Must(err, "NewStandardMonitor failed")
go monitor.Watch(mainCtx, 5*time.Second)

handler := tracker.NewHandler(globalTracker)
js := mustCreateJobService(mainCtx)
handler := tracker.NewHandler(globalTracker, js)
handler.Register(mux)

mustCreateJobService(mainCtx, mux)

healthy = true
log.Println("Running as manager service")
case "legacy":
Expand Down
50 changes: 0 additions & 50 deletions job-service/job-service.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"context"
"errors"
"log"
"net/http"
"reflect"
"sync"
"time"
Expand Down Expand Up @@ -177,55 +176,6 @@ func (svc *Service) NextJob(ctx context.Context) tracker.JobWithTarget {
return job
}

// JobHandler handle requests for new jobs.
// TODO - should update tracker instance.
func (svc *Service) JobHandler(resp http.ResponseWriter, req *http.Request) {
// Must be a post because it changes state.
if req.Method != http.MethodPost {
resp.WriteHeader(http.StatusMethodNotAllowed)
return
}
job := svc.NextJob(req.Context())

// Check whether there are any files
if svc.sClient != nil {
ok, err := job.Job.HasFiles(req.Context(), svc.sClient)
if err != nil {
log.Println(err)
}
if !ok {
log.Println(job, "has no files", job.Bucket)
resp.WriteHeader(http.StatusInternalServerError)
_, err = resp.Write([]byte("Job has no files. Try again."))
if err != nil {
log.Println(err)
}
return
}
}

err := svc.jobAdder.AddJob(job.Job)
if err != nil {
log.Println(err, job)
resp.WriteHeader(http.StatusInternalServerError)
_, err = resp.Write([]byte("Job already exists. Try again."))
if err != nil {
log.Println(err)
}
return
}

log.Printf("Dispatching %s\n", job.Job)
_, err = resp.Write(job.Marshal())
if err != nil {
log.Println(err)
// This should precede the Write(), but the Write failed, so this
// is likely ok.
resp.WriteHeader(http.StatusInternalServerError)
return
}
}

// Recover the processing date.
// Not thread-safe - should be called before activating service.
func (svc *Service) recoverDate(ctx context.Context) {
Expand Down
101 changes: 10 additions & 91 deletions job-service/job-service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,11 @@ import (
"encoding/json"
"errors"
"log"
"net/http"
"net/http/httptest"
"testing"
"time"

"bou.ke/monkey"
"cloud.google.com/go/datastore"
"cloud.google.com/go/storage"
"github.com/go-test/deep"

"github.com/m-lab/go/rtx"
Expand All @@ -24,8 +21,6 @@ import (
"github.com/m-lab/etl-gardener/job-service"
"github.com/m-lab/etl-gardener/persistence"
"github.com/m-lab/etl-gardener/tracker"

"github.com/m-lab/go/cloudtest/gcsfake"
)

func init() {
Expand Down Expand Up @@ -108,61 +103,6 @@ func TestService_NextJob(t *testing.T) {
}
}

func TestJobHandler(t *testing.T) {
fc := gcsfake.GCSClient{}
fc.AddTestBucket("fake-bucket",
&gcsfake.BucketHandle{
ObjAttrs: []*storage.ObjectAttrs{
{Name: "obj1", Updated: time.Now()},
{Name: "obj2", Updated: time.Now()},
{Name: "ndt/ndt5/2011/02/03/foobar.tgz", Size: 101, Updated: time.Now()},
}})

ctx := context.Background()

// Fake time will avoid yesterday trigger.
now := time.Date(2011, 2, 16, 1, 2, 3, 4, time.UTC)
monkey.Patch(time.Now, func() time.Time {
return now
})
defer monkey.Unpatch(time.Now)

sources := []config.SourceConfig{
{Bucket: "fake-bucket", Experiment: "ndt", Datatype: "tcpinfo", Target: "tmp_ndt.tcpinfo"},
{Bucket: "fake-bucket", Experiment: "ndt", Datatype: "ndt5", Target: "tmp_ndt.ndt5"},
}
start := time.Date(2011, 2, 3, 0, 0, 0, 0, time.UTC)
svc, err := job.NewJobService(ctx, &NullTracker{}, start, "fake-bucket", sources, &NullSaver{}, &fc)
must(t, err)
req := httptest.NewRequest("", "/job", nil)
resp := httptest.NewRecorder()
svc.JobHandler(resp, req)
if resp.Code != http.StatusMethodNotAllowed {
t.Error("Should be MethodNotAllowed", http.StatusText(resp.Code))
}

// This one should fail because there are no objects with tcpinfo prefix.
req = httptest.NewRequest("POST", "/job", nil)
resp = httptest.NewRecorder()
svc.JobHandler(resp, req)
if resp.Code != http.StatusInternalServerError {
t.Error("Should be InternalServerError", http.StatusText(resp.Code), resp.Body.String())
}

// This should succeed, because the service advanced to ndt5/2011/02/03/
req = httptest.NewRequest("POST", "/job", nil)
resp = httptest.NewRecorder()
svc.JobHandler(resp, req)
if resp.Code != http.StatusOK {
t.Error("Should be StatusOK", http.StatusText(resp.Code), resp.Body.String())
}

want := `{"Bucket":"fake-bucket","Experiment":"ndt","Datatype":"ndt5","Date":"2011-02-03T00:00:00Z"}`
if want != resp.Body.String() {
t.Fatal(resp.Body.String())
}
}

func TestResume(t *testing.T) {
// Fake time will avoid yesterday trigger.
now := time.Date(2011, 2, 16, 1, 2, 3, 4, time.UTC)
Expand Down Expand Up @@ -342,40 +282,19 @@ func TestEarlyWrapping(t *testing.T) {
must(t, err)

// If a job is still present in the tracker when it wraps, /job returns an error.
expected := []struct {
code int
body string
}{
{code: 200, body: `{"Bucket":"fake-bucket","Experiment":"ndt","Datatype":"ndt5","Date":"2011-02-03T00:00:00Z"}`},
{code: 200, body: `{"Bucket":"fake-bucket","Experiment":"ndt","Datatype":"tcpinfo","Date":"2011-02-03T00:00:00Z"}`},
{code: 200, body: `{"Bucket":"fake-bucket","Experiment":"ndt","Datatype":"ndt5","Date":"2011-02-04T00:00:00Z"}`},
{code: 200, body: `{"Bucket":"fake-bucket","Experiment":"ndt","Datatype":"tcpinfo","Date":"2011-02-04T00:00:00Z"}`},
expected := []tracker.Job{
tracker.Job{Bucket: "fake-bucket", Experiment: "ndt", Datatype: "ndt5", Date: start},
tracker.Job{Bucket: "fake-bucket", Experiment: "ndt", Datatype: "tcpinfo", Date: start},
tracker.Job{Bucket: "fake-bucket", Experiment: "ndt", Datatype: "ndt5", Date: start.AddDate(0, 0, 1)},
tracker.Job{Bucket: "fake-bucket", Experiment: "ndt", Datatype: "tcpinfo", Date: start.AddDate(0, 0, 1)},
// This one should work, because we complete it in the loop.
{code: 200, body: `{"Bucket":"fake-bucket","Experiment":"ndt","Datatype":"ndt5","Date":"2011-02-03T00:00:00Z"}`},
{code: 500, body: `Job already exists. Try again.`},
tracker.Job{Bucket: "fake-bucket", Experiment: "ndt", Datatype: "ndt5", Date: start},
}

for k, result := range expected {
req := httptest.NewRequest("POST", "/job", nil)
resp := httptest.NewRecorder()
svc.JobHandler(resp, req)
if resp.Code != result.code {
t.Error(k, resp.Code, resp.Body.String())
}
if resp.Body.String() != result.body {
t.Error(k, "Got:", resp.Body.String(), "!=", result.body)
}

// TODO - this should be pulled into a separate test
if k == 2 {
job := tracker.Job{}
json.Unmarshal([]byte(expected[0].body), &job)
status, _ := tk.GetStatus(job)
status.NewState(tracker.Complete)
err := tk.UpdateJob(job, status)
if err != nil {
t.Error(err)
}
for _, realJob := range expected {
got := svc.NextJob(context.Background())
if got.Job != realJob {
t.Errorf("NextJob() wrong job: got %v, want %v", got.Job, realJob)
}
}
}
Expand Down
54 changes: 51 additions & 3 deletions tracker/handler.go
Original file line number Diff line number Diff line change
@@ -1,15 +1,22 @@
package tracker

import (
"context"
"encoding/json"
"errors"
"log"
"net/http"
"net/url"
"time"

"github.com/m-lab/go/logx"
)

var (
MsgNoJobFound = "No job found. Try again."
MsgJobExists = "Job already exists. Try again."
)

// UpdateURL makes an update request URL.
func UpdateURL(base url.URL, job Job, state State, detail string) *url.URL {
base.Path += "update"
Expand Down Expand Up @@ -43,14 +50,19 @@ func ErrorURL(base url.URL, job Job, errString string) *url.URL {
return &base
}

type JobService interface {
NextJob(ctx context.Context) JobWithTarget
}

// Handler provides handlers for update, heartbeat, etc.
type Handler struct {
tracker *Tracker
tracker *Tracker
jobservice JobService
}

// NewHandler returns a Handler that sends updates to provided Tracker.
func NewHandler(tr *Tracker) *Handler {
return &Handler{tr}
func NewHandler(tr *Tracker, js JobService) *Handler {
return &Handler{tracker: tr, jobservice: js}
}

func getJob(jobString string) (Job, error) {
Expand Down Expand Up @@ -139,9 +151,45 @@ func (h *Handler) errorFunc(resp http.ResponseWriter, req *http.Request) {
resp.WriteHeader(http.StatusOK)
}

func (h *Handler) nextJob(resp http.ResponseWriter, req *http.Request) {
// Must be a post because it changes state.
if req.Method != http.MethodPost {
resp.WriteHeader(http.StatusMethodNotAllowed)
return
}
job := h.jobservice.NextJob(req.Context())

// Check for empty job (no job found with files)
if job.Date.Equal(time.Time{}) {
log.Println(MsgNoJobFound)
resp.WriteHeader(http.StatusInternalServerError)
resp.Write([]byte(MsgNoJobFound))
return
}

err := h.tracker.AddJob(job.Job)
if err != nil {
log.Println(err, job)
resp.WriteHeader(http.StatusInternalServerError)
resp.Write([]byte(MsgJobExists))
return
}

log.Printf("Dispatching %s\n", job.Job)
_, err = resp.Write(job.Marshal())
if err != nil {
log.Println(err)
// This should precede the Write(), but the Write failed, so this
// is likely ok.
resp.WriteHeader(http.StatusInternalServerError)
return
}
}

// Register registers the handlers on the server.
func (h *Handler) Register(mux *http.ServeMux) {
mux.HandleFunc("/heartbeat", h.heartbeat)
mux.HandleFunc("/update", h.update)
mux.HandleFunc("/error", h.errorFunc)
mux.HandleFunc("/job", h.nextJob)
}

0 comments on commit 025909c

Please sign in to comment.