Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: Replace machinery with tasqueue #32

Merged
merged 4 commits into from
Sep 21, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (
"net/url"
"strconv"

"github.com/RichardKnop/machinery/v1/tasks"
"github.com/kalbhor/tasqueue/v2"
"github.com/zerodha/dungbeetle/models"
)

Expand Down Expand Up @@ -97,8 +97,8 @@ func (c *Client) DeleteGroupJob(jobID string, purge bool) error {
}

// GetPendingJobs fetches the list of pending jobs.
func (c *Client) GetPendingJobs(queue string) ([]tasks.Signature, error) {
var out []tasks.Signature
func (c *Client) GetPendingJobs(queue string) ([]tasqueue.JobMessage, error) {
var out []tasqueue.JobMessage
err := c.doHTTPReq(http.MethodGet,
fmt.Sprintf(uriGetPendingJobs, queue), nil, nil, &out)
return out, err
Expand Down
7 changes: 3 additions & 4 deletions client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ func TestGetPendingJobs(t *testing.T) {
assert.NoError(t, err, "error fetching pending jobs")
assert.Equal(t, len(jobs), len(r), "incorrect number of pending jobs")
for i, j := range jobs {
assert.Equal(t, j.JobID, r[i].UUID, "job name doesn't match")
assert.Equal(t, j.JobID, r[i].ID, "job name doesn't match")
}
}

Expand All @@ -77,9 +77,8 @@ func TestDeleteJob(t *testing.T) {

func TestPostJobGroup(t *testing.T) {
_, err := cl.PostJobGroup(models.GroupReq{
GroupID: "testgroup",
Concurrency: 1,
Jobs: jobs,
GroupID: "testgroup",
Jobs: jobs,
})
assert.NoError(t, err, "error posting job group")
}
Expand Down
38 changes: 24 additions & 14 deletions cmd/config.sample.toml
Original file line number Diff line number Diff line change
@@ -1,18 +1,28 @@
# Redis addresses are denoted as: redis://password@host:port/db_num

[job_queue]
broker_address = "redis://127.0.0.1:6379/1"

# This is where Machinery stores it's job states.
# In Machinery's terminology, this is its "result backend"
[job_queue.broker]
addresses = ["localhost:6379"]
password = ""
db = 1
max_active = 50
max_idle = 20
dial_timeout = "1s"
read_timeout = "1s"
write_timeout = "1s"

# This is where Tasqueue stores it's job states.
# In Tasqueue's terminology, this is its "result backend"
# and is not to be confused with the DungBeetle's 'result_backend'
# where DungBeetle stores results of jobs, bypassing machinery.
state_address = "redis://127.0.0.1:6379/1"

# TTL in seconds after which a job entry and its meta disappears from the Machinery
# store. This does not remove results from the results DB.
state_ttl = "30m"

# where DungBeetle stores results of jobs, bypassing tasqueue.
[job_queue.results]
addresses = ["localhost:6379"]
password = ""
db = 1
max_active = 50
max_idle = 20
dial_timeout = "1s"
read_timeout = "1s"
write_timeout = "1s"
expiry = "30s"
meta_expiry = "3600s"

# These are the result backends where the results of various SQL query jobs are saved.
# There can be more than one backends defined here, for eg: [results.my1], [db.my2] ...
Expand Down
42 changes: 36 additions & 6 deletions cmd/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,14 @@ import (
"errors"
"fmt"
"log"
"log/slog"
"net/http"
"os"
"time"

bredis "github.com/kalbhor/tasqueue/v2/brokers/redis"
rredis "github.com/kalbhor/tasqueue/v2/results/redis"

"github.com/go-chi/chi/v5"
"github.com/knadh/koanf/v2"
"github.com/zerodha/dungbeetle/internal/core"
Expand Down Expand Up @@ -118,16 +122,42 @@ func initCore(ko *koanf.Koanf) *core.Core {
backends[name] = backend
}

lo := slog.Default()

rBroker := bredis.New(bredis.Options{
PollPeriod: bredis.DefaultPollPeriod,
Addrs: ko.MustStrings("job_queue.broker.addresses"),
Password: ko.String("job_queue.broker.password"),
DB: ko.Int("job_queue.broker.db"),
MinIdleConns: ko.MustInt("job_queue.broker.max_idle"),
DialTimeout: ko.MustDuration("job_queue.broker.dial_timeout"),
ReadTimeout: ko.MustDuration("job_queue.broker.read_timeout"),
WriteTimeout: ko.MustDuration("job_queue.broker.write_timeout"),
}, lo)

rResult := rredis.New(rredis.Options{
Addrs: ko.MustStrings("job_queue.results.addresses"),
Password: ko.String("job_queue.results.password"),
DB: ko.Int("job_queue.results.db"),
MinIdleConns: ko.MustInt("job_queue.results.max_idle"),
DialTimeout: ko.MustDuration("job_queue.results.dial_timeout"),
ReadTimeout: ko.MustDuration("job_queue.results.read_timeout"),
WriteTimeout: ko.MustDuration("job_queue.results.write_timeout"),
Expiry: ko.Duration("job_queue.results.expiry"),
MetaExpiry: ko.Duration("job_queue.results.meta_expiry"),
}, lo)

// Initialize the server and load SQL tasks.
co := core.New(core.Opt{
DefaultQueue: ko.MustString("queue"),
DefaultJobTTL: time.Second * 10,
QueueBrokerDSN: ko.MustString("job_queue.broker_address"),
QueueStateDSN: ko.MustString("job_queue.state_address"),
QueueStateTTL: ko.MustDuration("job_queue.state_ttl"),
DefaultQueue: ko.MustString("queue"),
DefaultJobTTL: time.Second * 10,
DefaultGroupConcurrency: 1,
Results: rResult,
Broker: rBroker,
}, srcPool, backends, lo)
if err := co.LoadTasks(ko.MustStrings("sql-directory")); err != nil {
lo.Fatal(err)
lo.Error("could not load tasks", "error", err)
return nil
}

return co
Expand Down
5 changes: 4 additions & 1 deletion cmd/main.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package main

import (
"context"
"fmt"
"log"
"os"
Expand Down Expand Up @@ -98,8 +99,10 @@ func main() {
go initHTTP(co)
}

ctx := context.Background()

// Start the core.
if err := co.Start(ko.MustString("worker-name"), ko.MustInt("worker-concurrency")); err != nil {
if err := co.Start(ctx, ko.MustString("worker-name"), ko.MustInt("worker-concurrency")); err != nil {
lo.Fatal(err)
}
}
73 changes: 22 additions & 51 deletions go.mod
Original file line number Diff line number Diff line change
@@ -1,14 +1,13 @@
module github.com/zerodha/dungbeetle

go 1.20
go 1.21

require (
github.com/ClickHouse/clickhouse-go v1.5.4
github.com/RichardKnop/machinery v1.10.6
github.com/go-chi/chi/v5 v5.0.10
github.com/go-sql-driver/mysql v1.7.1
github.com/gofrs/uuid/v5 v5.0.0
github.com/gomodule/redigo v2.0.0+incompatible
github.com/kalbhor/tasqueue/v2 v2.1.0
github.com/knadh/goyesql/v2 v2.2.0
github.com/knadh/koanf/parsers/toml v0.1.0
github.com/knadh/koanf/providers/env v0.1.0
Expand All @@ -18,67 +17,39 @@ require (
github.com/lib/pq v1.10.9
github.com/spf13/pflag v1.0.5
github.com/stretchr/testify v1.8.4
github.com/vmihailenco/msgpack v4.0.4+incompatible
)

require (
cloud.google.com/go v0.81.0 // indirect
cloud.google.com/go/pubsub v1.10.3 // indirect
github.com/RichardKnop/logging v0.0.0-20190827224416-1a693bdd4fae // indirect
github.com/aws/aws-sdk-go v1.38.26 // indirect
github.com/bradfitz/gomemcache v0.0.0-20190913173617-a41fca850d0b // indirect
github.com/cespare/xxhash/v2 v2.1.1 // indirect
github.com/cespare/xxhash/v2 v2.1.2 // indirect
github.com/cloudflare/golz4 v0.0.0-20150217214814-ef862a3cdc58 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect
github.com/fsnotify/fsnotify v1.6.0 // indirect
github.com/go-redis/redis/v8 v8.8.2 // indirect
github.com/go-redsync/redsync/v4 v4.3.0 // indirect
github.com/go-stack/stack v1.8.0 // indirect
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect
github.com/golang/protobuf v1.5.2 // indirect
github.com/golang/snappy v0.0.3 // indirect
github.com/google/go-cmp v0.5.7 // indirect
github.com/google/uuid v1.2.0 // indirect
github.com/googleapis/gax-go/v2 v2.0.5 // indirect
github.com/hashicorp/errwrap v1.1.0 // indirect
github.com/hashicorp/go-multierror v1.1.1 // indirect
github.com/jmespath/go-jmespath v0.4.0 // indirect
github.com/jstemmer/go-junit-report v0.9.1 // indirect
github.com/kelseyhightower/envconfig v1.4.0 // indirect
github.com/klauspost/compress v1.12.1 // indirect
github.com/go-logr/logr v1.2.4 // indirect
github.com/go-logr/stdr v1.2.2 // indirect
github.com/go-redis/redis/v8 v8.11.5 // indirect
github.com/golang/protobuf v1.5.3 // indirect
github.com/google/uuid v1.3.1 // indirect
github.com/knadh/koanf/maps v0.1.1 // indirect
github.com/kr/pretty v0.1.0 // indirect
github.com/mitchellh/copystructure v1.2.0 // indirect
github.com/mitchellh/mapstructure v1.5.0 // indirect
github.com/mitchellh/reflectwalk v1.0.2 // indirect
github.com/opentracing/opentracing-go v1.2.0 // indirect
github.com/pelletier/go-toml v1.9.5 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/robfig/cron/v3 v3.0.1 // indirect
github.com/streadway/amqp v1.0.0 // indirect
github.com/xdg-go/pbkdf2 v1.0.0 // indirect
github.com/xdg-go/scram v1.0.2 // indirect
github.com/xdg-go/stringprep v1.0.2 // indirect
github.com/youmark/pkcs8 v0.0.0-20201027041543-1326539a0a0a // indirect
go.mongodb.org/mongo-driver v1.5.1 // indirect
go.opencensus.io v0.23.0 // indirect
go.opentelemetry.io/otel v0.20.0 // indirect
go.opentelemetry.io/otel/metric v0.20.0 // indirect
go.opentelemetry.io/otel/trace v0.20.0 // indirect
golang.org/x/crypto v0.0.0-20210921155107-089bfa567519 // indirect
golang.org/x/lint v0.0.0-20210508222113-6edffad5e616 // indirect
golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4 // indirect
golang.org/x/net v0.7.0 // indirect
golang.org/x/oauth2 v0.0.0-20210413134643-5e61552d6c78 // indirect
golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4 // indirect
golang.org/x/sys v0.11.0 // indirect
golang.org/x/text v0.7.0 // indirect
golang.org/x/tools v0.1.12 // indirect
google.golang.org/api v0.45.0 // indirect
google.golang.org/appengine v1.6.7 // indirect
google.golang.org/genproto v0.0.0-20210602131652-f16073e35f0c // indirect
google.golang.org/grpc v1.38.0 // indirect
google.golang.org/protobuf v1.26.0 // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
github.com/vmihailenco/msgpack/v5 v5.3.5 // indirect
github.com/vmihailenco/tagparser/v2 v2.0.0 // indirect
go.opentelemetry.io/otel v1.18.0 // indirect
go.opentelemetry.io/otel/metric v1.18.0 // indirect
go.opentelemetry.io/otel/sdk v1.18.0 // indirect
go.opentelemetry.io/otel/trace v1.18.0 // indirect
golang.org/x/net v0.10.0 // indirect
golang.org/x/sys v0.12.0 // indirect
golang.org/x/text v0.11.0 // indirect
google.golang.org/appengine v1.6.5 // indirect
google.golang.org/protobuf v1.31.0 // indirect
gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)
Loading