Skip to content

Commit

Permalink
Merge 75efff1 into 19ec9eb
Browse files Browse the repository at this point in the history
  • Loading branch information
stephen-soltesz committed Jul 29, 2022
2 parents 19ec9eb + 75efff1 commit c0cb546
Show file tree
Hide file tree
Showing 2 changed files with 94 additions and 144 deletions.
98 changes: 6 additions & 92 deletions tracker/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,8 @@ package tracker

import (
"context"
"encoding/json"
"errors"
"log"
"net/http"
"net/url"
"time"

"github.com/m-lab/go/logx"
Expand All @@ -17,42 +14,6 @@ var (
MsgJobExists = "Job already exists. Try again."
)

// UpdateURL makes an update request URL.
// TODO(soltesz): move client functions to client package.
func UpdateURL(base url.URL, job Job, state State, detail string) *url.URL {
base.Path += "update"
params := make(url.Values, 3)
params.Add("job", string(job.Marshal()))
params.Add("state", string(state))
params.Add("detail", detail)

base.RawQuery = params.Encode()
return &base
}

// HeartbeatURL makes an update request URL.
// TODO(soltesz): move client functions to client package.
func HeartbeatURL(base url.URL, job Job) *url.URL {
base.Path += "heartbeat"
params := make(url.Values, 3)
params.Add("job", string(job.Marshal()))

base.RawQuery = params.Encode()
return &base
}

// ErrorURL makes an update request URL.
// TODO(soltesz): move client functions to client package.
func ErrorURL(base url.URL, job Job, errString string) *url.URL {
base.Path += "error"
params := make(url.Values, 3)
params.Add("job", string(job.Marshal()))
params.Add("error", errString)

base.RawQuery = params.Encode()
return &base
}

type JobService interface {
NextJob(ctx context.Context) *JobWithTarget
}
Expand All @@ -68,34 +29,6 @@ func NewHandler(tr *Tracker, js JobService) *Handler {
return &Handler{tracker: tr, jobservice: js}
}

func getJob(jobString string) (Job, error) {
var job Job
if jobString == "" {
return job, errors.New("Empty job")
}
err := json.Unmarshal([]byte(jobString), &job)
return job, err
}

func getID(req *http.Request) (Key, error) {
// Prefer the /v2 "id" parameter.
id := req.Form.Get("id")
if id != "" {
return Key(id), nil
}

// Fallback to the "job" parameter used by the original /job api.
j := req.Form.Get("job")
if j == "" {
return "", errors.New("no job id found")
}
job, err := getJob(j)
if err != nil {
return "", err
}
return job.Key(), nil
}

func (h *Handler) heartbeat(resp http.ResponseWriter, req *http.Request) {
if req.Method != http.MethodPost {
resp.WriteHeader(http.StatusMethodNotAllowed)
Expand All @@ -105,8 +38,8 @@ func (h *Handler) heartbeat(resp http.ResponseWriter, req *http.Request) {
resp.WriteHeader(http.StatusBadRequest)
return
}
id, err := getID(req)
if err != nil {
id := Key(req.Form.Get("id"))
if id == "" {
resp.WriteHeader(http.StatusUnprocessableEntity)
return
}
Expand All @@ -127,8 +60,8 @@ func (h *Handler) update(resp http.ResponseWriter, req *http.Request) {
resp.WriteHeader(http.StatusBadRequest)
return
}
id, err := getID(req)
if err != nil {
id := Key(req.Form.Get("id"))
if id == "" {
resp.WriteHeader(http.StatusUnprocessableEntity)
return
}
Expand Down Expand Up @@ -156,9 +89,9 @@ func (h *Handler) errorFunc(resp http.ResponseWriter, req *http.Request) {
resp.WriteHeader(http.StatusBadRequest)
return
}
id := Key(req.Form.Get("id"))
jobErr := req.Form.Get("error")
id, err := getID(req)
if err != nil {
if id == "" {
resp.WriteHeader(http.StatusUnprocessableEntity)
return
}
Expand Down Expand Up @@ -199,20 +132,6 @@ func (h *Handler) nextJob(resp http.ResponseWriter, req *http.Request) *JobWithT
return jt
}

// nextJobV1 returns the next JobWithTarget and returns the tracker.Job to the requesting client.
func (h *Handler) nextJobV1(resp http.ResponseWriter, req *http.Request) {
jt := h.nextJob(resp, req)
if jt == nil {
return
}
log.Printf("Dispatching %s\n", jt.Job)
_, err := resp.Write(jt.Job.Marshal())
if err != nil {
log.Println(err)
return
}
}

// nextJobV2 returns the next JobWithTarget to the requesting client.
func (h *Handler) nextJobV2(resp http.ResponseWriter, req *http.Request) {
jt := h.nextJob(resp, req)
Expand All @@ -229,11 +148,6 @@ func (h *Handler) nextJobV2(resp http.ResponseWriter, req *http.Request) {

// Register registers the handlers on the server.
func (h *Handler) Register(mux *http.ServeMux) {
mux.HandleFunc("/heartbeat", h.heartbeat)
mux.HandleFunc("/update", h.update)
mux.HandleFunc("/error", h.errorFunc)
mux.HandleFunc("/job", h.nextJobV1)

mux.HandleFunc("/v2/job/heartbeat", h.heartbeat)
mux.HandleFunc("/v2/job/update", h.update)
mux.HandleFunc("/v2/job/error", h.errorFunc)
Expand Down
140 changes: 88 additions & 52 deletions tracker/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (

"github.com/m-lab/go/logx"

gardener "github.com/m-lab/etl-gardener/client/v2"
"github.com/m-lab/etl-gardener/persistence"
"github.com/m-lab/etl-gardener/tracker"
"github.com/m-lab/etl-gardener/tracker/jobtest"
Expand Down Expand Up @@ -87,26 +88,46 @@ func TestUpdateHandler(t *testing.T) {
job := jobtest.NewJob("bucket", "exp", "type", date)
server, tk := testSetup(t, []tracker.Job{job})

url := tracker.UpdateURL(server, job, tracker.Parsing, "foobar")
ctx := context.Background()
c := gardener.NewJobClient(server)

getAndExpect(t, url, http.StatusMethodNotAllowed)

// Fail if job doesn't exist.
postAndExpect(t, url, http.StatusGone)
// Attempt to update job that is not yet in tracker.
err := c.Update(ctx, job.Key(), tracker.Init, "foo")
if err == nil {
t.Fatal(err)
}
err = c.Update(ctx, job.Key(), "", "foo")
if err == nil {
t.Fatal(err)
}
err = c.Update(ctx, "", tracker.Init, "foo")
if err == nil {
t.Fatal(err)
}

// Add job to tracker.
tk.AddJob(job)

// should update state to Parsing
postAndExpect(t, url, http.StatusOK)
// Update job state.
err = c.Update(ctx, job.Key(), tracker.Parsing, "foo")
if err != nil {
t.Fatal(err)
}

// Confirm status.
stat, err := tk.GetStatus(job.Key())
must(t, err)
if stat.State() != tracker.Parsing {
t.Fatal("update failed", stat)
}

url = tracker.UpdateURL(server, job, tracker.Complete, "")
postAndExpect(t, url, http.StatusOK)
// Update job again as complete.
err = c.Update(ctx, job.Key(), tracker.Complete, "")
if err != nil {
t.Fatal(err)
}

// Verify job is no longer present.
_, err = tk.GetStatus(job.Key())
if err != tracker.ErrJobNotFound {
t.Fatal("Expected JobNotFound", err)
Expand All @@ -119,27 +140,44 @@ func TestHeartbeatHandler(t *testing.T) {
job := jobtest.NewJob("bucket", "exp", "type", date)
server, tk := testSetup(t, []tracker.Job{job})

url := tracker.HeartbeatURL(server, job)

getAndExpect(t, url, http.StatusMethodNotAllowed)
ctx := context.Background()
c := gardener.NewJobClient(server)

// Fail if job doesn't exist.
postAndExpect(t, url, http.StatusGone)
// Attempt to send heartbeat for job that is not yet in tracker.
err := c.Heartbeat(ctx, job.Key())
if err == nil {
t.Fatal(err)
}
err = c.Heartbeat(ctx, "")
if err == nil {
t.Fatal(err)
}

// Add job to tracker.
tk.AddJob(job)

// should update state to Parsing
postAndExpect(t, url, http.StatusOK)
// Send heartbeat for job.
err = c.Heartbeat(ctx, job.Key())
if err != nil {
t.Fatal(err)
}

// Get job status.
stat, err := tk.GetStatus(job.Key())
must(t, err)
if err != nil {
t.Fatal(err)
}
if time.Since(stat.HeartbeatTime) > 1*time.Second {
t.Fatal("heartbeat failed", stat)
}
t.Log(stat)

url = tracker.UpdateURL(server, job, tracker.Complete, "")
postAndExpect(t, url, http.StatusOK)
// Update job again as complete.
err = c.Update(ctx, job.Key(), tracker.Complete, "")
if err != nil {
t.Fatal(err)
}

// Verify job is no longer present.
_, err = tk.GetStatus(job.Key())
if err != tracker.ErrJobNotFound {
t.Fatal("Expected JobNotFound", err)
Expand All @@ -151,59 +189,57 @@ func TestErrorHandler(t *testing.T) {
job := jobtest.NewJob("bucket", "exp", "type", date)
server, tk := testSetup(t, []tracker.Job{job})

url := tracker.ErrorURL(server, job, "error")

getAndExpect(t, url, http.StatusMethodNotAllowed)
ctx := context.Background()
c := gardener.NewJobClient(server)

// Job should not yet exist.
postAndExpect(t, url, http.StatusGone)
err := c.Error(ctx, job.Key(), "no such job")
if err == nil {
t.Fatal(err)
}
// ID is empty.
err = c.Error(ctx, "", "")
if err == nil {
t.Fatal(err)
}
// error message is empty.
err = c.Error(ctx, job.Key(), "")
if err == nil {
t.Fatal(err)
}

// Add job to tracker.
tk.AddJob(job)

// should successfully update state to Failed
postAndExpect(t, url, http.StatusOK)
err = c.Error(ctx, job.Key(), "error")
if err != nil {
t.Fatal(err)
}

// Verify error is updated in tracker.
stat, err := tk.GetStatus(job.Key())
must(t, err)
if err != nil {
t.Fatal(err)
}
if stat.Detail() != "error" {
t.Error("Expected error:", stat.Detail())
}
if stat.State() != tracker.ParseError {
t.Error("Wrong state:", stat)
}

url = tracker.UpdateURL(server, job, tracker.Complete, "")
postAndExpect(t, url, http.StatusOK)
err = c.Update(ctx, job.Key(), tracker.Complete, "")
if err != nil {
t.Fatal(err)
}

_, err = tk.GetStatus(job.Key())
if err != tracker.ErrJobNotFound {
t.Fatal("Expected JobNotFound", err)
}
}

func TestNextJobHandler(t *testing.T) {
date := time.Date(2019, 01, 02, 0, 0, 0, 0, time.UTC)
job := jobtest.NewJob("bucket", "exp", "type", date)
// Add job, empty, and duplicate job.
url, _ := testSetup(t, []tracker.Job{job, tracker.Job{}, job})
url.Path = "job"

// Wrong method.
getAndExpect(t, &url, http.StatusMethodNotAllowed)

// This should succeed, because the fakeJobService returns its job.
r := postAndExpect(t, &url, http.StatusOK)
want := `{"Bucket":"bucket","Experiment":"exp","Datatype":"type","Date":"2019-01-02T00:00:00Z"}`
if want != r {
t.Fatalf("/job returned wrong result: got %q, want %q", r, want)
}

// This one should fail because the fakeJobService returns empty results.
postAndExpect(t, &url, http.StatusInternalServerError)

// This one should fail because the fakeJobService returns a duplicate job.
postAndExpect(t, &url, http.StatusInternalServerError)
}

func TestNextJobV2Handler(t *testing.T) {
date := time.Date(2019, 01, 02, 0, 0, 0, 0, time.UTC)
job := jobtest.NewJob("bucket", "exp", "type", date)
Expand Down

0 comments on commit c0cb546

Please sign in to comment.