Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fixes in rebrand #34

Merged
merged 9 commits into from
Dec 19, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
FROM golang:1.12-alpine AS builder
FROM golang:1.21-alpine AS builder
RUN apk update && apk add make git
WORKDIR /dungbeetle/
COPY ./ ./
Expand Down
3 changes: 2 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,10 @@ run:
.PHONY: dist
dist: build

# Run tests in sequence
.PHONY: test
test:
go test ./...
go test ./... -v -p 1

# Use goreleaser to do a dry run producing local builds.
.PHONY: release-dry
Expand Down
4 changes: 2 additions & 2 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import (
"encoding/json"
"fmt"
"io"
"log"
"log/slog"
"net/http"
"net/url"
"strconv"
Expand Down Expand Up @@ -48,7 +48,7 @@ type httpResp struct {
type Opt struct {
RootURL string
HTTPClient *http.Client
Logger *log.Logger
Logger *slog.Logger
}

// Client represents the SQL Jobber API client.
Expand Down
105 changes: 86 additions & 19 deletions client/client_test.go
Original file line number Diff line number Diff line change
@@ -1,30 +1,45 @@
package client

import (
"log"
"context"
"log/slog"
"net/http"
"os"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/zerodha/dungbeetle/internal/core"
"github.com/zerodha/dungbeetle/models"
)

var (
cl *Client
eta = time.Now().Add(time.Hour)
jobs = []models.JobReq{{
JobID: "job1",
jobs = []models.JobReq{
{
JobID: "job1",
TaskName: "get_profit_summary",
Queue: "test",
Retries: 4,
}, {
JobID: "job2",
TaskName: "get_profit_summary",
Queue: "test",
Retries: 3,
}}

jobsPending = []models.JobReq{{
JobID: "job3",
TaskName: "get_profit_summary",
Queue: "test",
Queue: "test_pending",
Retries: 3,
}, {
JobID: "job2",
JobID: "job4",
TaskName: "get_profit_summary",
Queue: "test",
Queue: "test_pending",
Retries: 3,
}}
},
}
)

func init() {
Expand All @@ -37,10 +52,62 @@ func init() {
ResponseHeaderTimeout: time.Second * 5,
},
},
Logger: log.New(os.Stdout, "", 0),
Logger: slog.Default(),
})
}

func TestSlowQuery(t *testing.T) {
group := models.GroupReq{
GroupID: "slow_group",
Jobs: []models.JobReq{{
JobID: "job-slow",
TaskName: "slow_query",
Queue: "test",
Retries: 3,
// 5 here means the duration (in seconds) the query will
// take to execute
Args: []string{"4"},
},
},
}

// Submit the group
r, err := cl.PostJobGroup(group)
assert.NoError(t, err, "error posting group")
assert.Equal(t, group.GroupID, r.GroupID)

var (
count = 0
// To check for status every second
tk = time.NewTicker(time.Second)
// Check for result max twice
checkCount = 2
// The ctx that will trigger after the job is done
ctx, _ = context.WithTimeout(context.Background(), time.Second*5)
)

for {
select {
case <-tk.C:
if count >= checkCount {
tk.Stop()
}
resp, err := cl.GetGroupStatus(r.GroupID)
assert.NoError(t, err, "error getting group status")
if resp.State != core.StatusPending && resp.State != core.StatusStarted {
t.Fatalf("expected (%s or %s), got %s", core.StatusPending, core.StatusStarted, resp.State)
}
count++
case <-ctx.Done():
resp, err := cl.GetGroupStatus(r.GroupID)
assert.NoError(t, err, "error getting group status")
assert.Equal(t, core.StatusSuccess, resp.State)
return
}
}
//assert.Equal(t, group.GroupID, r.GroupID)
}

func TestPostJob(t *testing.T) {
for _, j := range jobs {
r, err := cl.PostJob(j)
Expand All @@ -60,17 +127,20 @@ func TestGetJobStatus(t *testing.T) {
}

func TestGetPendingJobs(t *testing.T) {
r, err := cl.GetPendingJobs("test")
assert.NoError(t, err, "error fetching pending jobs")
assert.Equal(t, len(jobs), len(r), "incorrect number of pending jobs")
for i, j := range jobs {
assert.Equal(t, j.JobID, r[i].ID, "job name doesn't match")
for _, j := range jobsPending {
r, err := cl.PostJob(j)
assert.NoError(t, err, "error posting job")
assert.Equal(t, j.JobID, r.JobID)
}

r, err := cl.GetPendingJobs("test_pending")
assert.NoError(t, err, "error fetching pending jobs")
assert.Equal(t, len(jobsPending), len(r), "incorrect number of pending jobs")
}

func TestDeleteJob(t *testing.T) {
for _, j := range jobs {
err := cl.DeleteJob(j.JobID, false)
err := cl.DeleteJob(j.JobID, true)
assert.NoError(t, err, "error deleting job")
}
}
Expand All @@ -83,9 +153,6 @@ func TestPostJobGroup(t *testing.T) {
assert.NoError(t, err, "error posting job group")
}
func TestGetJobGroupStatus(t *testing.T) {
r, err := cl.GetGroupStatus("testgroup")
_, err := cl.GetGroupStatus("testgroup")
assert.NoError(t, err, "error getting job group status")
for i, j := range jobs {
assert.Equal(t, j.JobID, r.Jobs[i].JobID)
}
}
6 changes: 6 additions & 0 deletions cmd/config.sample.toml
Original file line number Diff line number Diff line change
@@ -1,3 +1,9 @@
[app]
# log level is one of : INFO, DEBUG, ERROR
log_level = "INFO"
# maximum time allowed for a job to run once it has started execution
default_job_ttl = "60s"

# The broker that manages job queuing.
# Currently, only "redis" is supported.
[job_queue.broker]
Expand Down
25 changes: 15 additions & 10 deletions cmd/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,6 @@ import (
"github.com/zerodha/dungbeetle/models"
)

// groupConcurrency represents the concurrency factor for job groups.
const groupConcurrency = 5

// reValidateName represents the character classes allowed in a job ID.
var reValidateName = regexp.MustCompile("(?i)^[a-z0-9-_:]+$")

Expand All @@ -34,12 +31,11 @@ func handleGetTasksList(w http.ResponseWriter, r *http.Request) {

// Just the names.
out := make([]string, 0, len(tasks))
for name, _ := range tasks {
for name := range tasks {
out = append(out, name)
}

sendResponse(w, out)
return
}

// handleGetJobStatus returns the status of a given jobID.
Expand All @@ -52,6 +48,7 @@ func handleGetJobStatus(w http.ResponseWriter, r *http.Request) {

out, err := co.GetJobStatus(jobID)
if err != nil {
lo.Error("could not get job status", "error", err, "job_id", jobID)
sendErrorResponse(w, "job not found", http.StatusNotFound)
return
}
Expand All @@ -69,6 +66,7 @@ func handleGetGroupStatus(w http.ResponseWriter, r *http.Request) {

out, err := co.GetJobGroupStatus(groupID)
if err != nil {
lo.Error("could not get group job status", "error", err, "group_id", groupID)
sendErrorResponse(w, err.Error(), http.StatusNotFound)
return
}
Expand All @@ -79,11 +77,13 @@ func handleGetGroupStatus(w http.ResponseWriter, r *http.Request) {
// handleGetPendingJobs returns pending jobs in a given queue.
func handleGetPendingJobs(w http.ResponseWriter, r *http.Request) {
var (
co = r.Context().Value("core").(*core.Core)
co = r.Context().Value("core").(*core.Core)
queue = chi.URLParam(r, "queue")
)

out, err := co.GetPendingJobs(chi.URLParam(r, "queue"))
out, err := co.GetPendingJobs(queue)
if err != nil {
lo.Error("could not get pending jobs", "error", err, "queue", queue)
sendErrorResponse(w, "error fetching pending tasks", http.StatusInternalServerError)
return
}
Expand All @@ -100,6 +100,7 @@ func handlePostJob(w http.ResponseWriter, r *http.Request) {
)

if r.ContentLength == 0 {
lo.Error("request body sent empty")
sendErrorResponse(w, "request body is empty", http.StatusBadRequest)
return
}
Expand All @@ -109,7 +110,7 @@ func handlePostJob(w http.ResponseWriter, r *http.Request) {
req models.JobReq
)
if err := decoder.Decode(&req); err != nil {
lo.Printf("error parsing request JSON: %v", err)
lo.Error("error parsing request JSON", "error", err, "task_name", taskName, "request", req)
sendErrorResponse(w, "error parsing request JSON", http.StatusBadRequest)
return
}
Expand All @@ -122,6 +123,7 @@ func handlePostJob(w http.ResponseWriter, r *http.Request) {
// Create the job signature.
out, err := co.NewJob(req, taskName)
if err != nil {
lo.Error("could not create new job", "error", err, "task_name", taskName, "request", req)
sendErrorResponse(w, err.Error(), http.StatusInternalServerError)
return
}
Expand All @@ -139,14 +141,14 @@ func handlePostJobGroup(w http.ResponseWriter, r *http.Request) {
)

if err := decoder.Decode(&req); err != nil {
lo.Printf("error parsing JSON body: %v", err)
lo.Error("error parsing JSON body", "error", err, "request", req)
sendErrorResponse(w, "error parsing JSON body", http.StatusBadRequest)
return
}

out, err := co.NewJobGroup(req)
if err != nil {
lo.Printf("error creating job signature: %v", err)
lo.Error("error creating job signature", "error", err, "request", req)
sendErrorResponse(w, err.Error(), http.StatusInternalServerError)
return
}
Expand All @@ -165,6 +167,7 @@ func handleCancelJob(w http.ResponseWriter, r *http.Request) {
)

if err := co.CancelJob(jobID, purge); err != nil {
lo.Error("could not cancel job", "error", err, "job_id", jobID)
sendErrorResponse(w, err.Error(), http.StatusInternalServerError)
}

Expand All @@ -183,6 +186,7 @@ func handleCancelGroupJob(w http.ResponseWriter, r *http.Request) {

// Get state of group.
if err := co.CancelJobGroup(groupID, purge); err != nil {
lo.Error("could not cancel group job", "error", err, "group_id", groupID)
sendErrorResponse(w, err.Error(), http.StatusInternalServerError)
return
}
Expand All @@ -195,6 +199,7 @@ func sendResponse(w http.ResponseWriter, data interface{}) {
w.Header().Set("Content-Type", "application/json; charset=utf-8")
out, err := json.Marshal(models.HTTPResp{Status: "success", Data: data})
if err != nil {
lo.Error("could marshal response", "error", err)
sendErrorResponse(w, "Internal Server Error", http.StatusInternalServerError)
return
}
Expand Down
Loading