Skip to content

Commit

Permalink
refactor: Replace machinery with tasqueue (#32)
Browse files Browse the repository at this point in the history
  • Loading branch information
kalbhor committed Sep 21, 2023
1 parent 3dca8e5 commit 09e09f0
Show file tree
Hide file tree
Showing 12 changed files with 314 additions and 939 deletions.
6 changes: 3 additions & 3 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (
"net/url"
"strconv"

"github.com/RichardKnop/machinery/v1/tasks"
"github.com/kalbhor/tasqueue/v2"
"github.com/zerodha/dungbeetle/models"
)

Expand Down Expand Up @@ -97,8 +97,8 @@ func (c *Client) DeleteGroupJob(jobID string, purge bool) error {
}

// GetPendingJobs fetches the list of pending jobs.
func (c *Client) GetPendingJobs(queue string) ([]tasks.Signature, error) {
var out []tasks.Signature
func (c *Client) GetPendingJobs(queue string) ([]tasqueue.JobMessage, error) {
var out []tasqueue.JobMessage
err := c.doHTTPReq(http.MethodGet,
fmt.Sprintf(uriGetPendingJobs, queue), nil, nil, &out)
return out, err
Expand Down
7 changes: 3 additions & 4 deletions client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ func TestGetPendingJobs(t *testing.T) {
assert.NoError(t, err, "error fetching pending jobs")
assert.Equal(t, len(jobs), len(r), "incorrect number of pending jobs")
for i, j := range jobs {
assert.Equal(t, j.JobID, r[i].UUID, "job name doesn't match")
assert.Equal(t, j.JobID, r[i].ID, "job name doesn't match")
}
}

Expand All @@ -77,9 +77,8 @@ func TestDeleteJob(t *testing.T) {

func TestPostJobGroup(t *testing.T) {
_, err := cl.PostJobGroup(models.GroupReq{
GroupID: "testgroup",
Concurrency: 1,
Jobs: jobs,
GroupID: "testgroup",
Jobs: jobs,
})
assert.NoError(t, err, "error posting job group")
}
Expand Down
38 changes: 24 additions & 14 deletions cmd/config.sample.toml
Original file line number Diff line number Diff line change
@@ -1,18 +1,28 @@
# Redis addresses are denoted as: redis://password@host:port/db_num

[job_queue]
broker_address = "redis://127.0.0.1:6379/1"

# This is where Machinery stores it's job states.
# In Machinery's terminology, this is its "result backend"
[job_queue.broker]
addresses = ["localhost:6379"]
password = ""
db = 1
max_active = 50
max_idle = 20
dial_timeout = "1s"
read_timeout = "1s"
write_timeout = "1s"

# This is where Tasqueue stores it's job states.
# In Tasqueue's terminology, this is its "result backend"
# and is not to be confused with the DungBeetle's 'result_backend'
# where DungBeetle stores results of jobs, bypassing machinery.
state_address = "redis://127.0.0.1:6379/1"

# TTL in seconds after which a job entry and its meta disappears from the Machinery
# store. This does not remove results from the results DB.
state_ttl = "30m"

# where DungBeetle stores results of jobs, bypassing tasqueue.
[job_queue.results]
addresses = ["localhost:6379"]
password = ""
db = 1
max_active = 50
max_idle = 20
dial_timeout = "1s"
read_timeout = "1s"
write_timeout = "1s"
expiry = "30s"
meta_expiry = "3600s"

# These are the result backends where the results of various SQL query jobs are saved.
# There can be more than one backends defined here, for eg: [results.my1], [db.my2] ...
Expand Down
42 changes: 36 additions & 6 deletions cmd/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,14 @@ import (
"errors"
"fmt"
"log"
"log/slog"
"net/http"
"os"
"time"

bredis "github.com/kalbhor/tasqueue/v2/brokers/redis"
rredis "github.com/kalbhor/tasqueue/v2/results/redis"

"github.com/go-chi/chi/v5"
"github.com/knadh/koanf/v2"
"github.com/zerodha/dungbeetle/internal/core"
Expand Down Expand Up @@ -118,16 +122,42 @@ func initCore(ko *koanf.Koanf) *core.Core {
backends[name] = backend
}

lo := slog.Default()

rBroker := bredis.New(bredis.Options{
PollPeriod: bredis.DefaultPollPeriod,
Addrs: ko.MustStrings("job_queue.broker.addresses"),
Password: ko.String("job_queue.broker.password"),
DB: ko.Int("job_queue.broker.db"),
MinIdleConns: ko.MustInt("job_queue.broker.max_idle"),
DialTimeout: ko.MustDuration("job_queue.broker.dial_timeout"),
ReadTimeout: ko.MustDuration("job_queue.broker.read_timeout"),
WriteTimeout: ko.MustDuration("job_queue.broker.write_timeout"),
}, lo)

rResult := rredis.New(rredis.Options{
Addrs: ko.MustStrings("job_queue.results.addresses"),
Password: ko.String("job_queue.results.password"),
DB: ko.Int("job_queue.results.db"),
MinIdleConns: ko.MustInt("job_queue.results.max_idle"),
DialTimeout: ko.MustDuration("job_queue.results.dial_timeout"),
ReadTimeout: ko.MustDuration("job_queue.results.read_timeout"),
WriteTimeout: ko.MustDuration("job_queue.results.write_timeout"),
Expiry: ko.Duration("job_queue.results.expiry"),
MetaExpiry: ko.Duration("job_queue.results.meta_expiry"),
}, lo)

// Initialize the server and load SQL tasks.
co := core.New(core.Opt{
DefaultQueue: ko.MustString("queue"),
DefaultJobTTL: time.Second * 10,
QueueBrokerDSN: ko.MustString("job_queue.broker_address"),
QueueStateDSN: ko.MustString("job_queue.state_address"),
QueueStateTTL: ko.MustDuration("job_queue.state_ttl"),
DefaultQueue: ko.MustString("queue"),
DefaultJobTTL: time.Second * 10,
DefaultGroupConcurrency: 1,
Results: rResult,
Broker: rBroker,
}, srcPool, backends, lo)
if err := co.LoadTasks(ko.MustStrings("sql-directory")); err != nil {
lo.Fatal(err)
lo.Error("could not load tasks", "error", err)
return nil
}

return co
Expand Down
5 changes: 4 additions & 1 deletion cmd/main.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package main

import (
"context"
"fmt"
"log"
"os"
Expand Down Expand Up @@ -98,8 +99,10 @@ func main() {
go initHTTP(co)
}

ctx := context.Background()

// Start the core.
if err := co.Start(ko.MustString("worker-name"), ko.MustInt("worker-concurrency")); err != nil {
if err := co.Start(ctx, ko.MustString("worker-name"), ko.MustInt("worker-concurrency")); err != nil {
lo.Fatal(err)
}
}
73 changes: 22 additions & 51 deletions go.mod
Original file line number Diff line number Diff line change
@@ -1,14 +1,13 @@
module github.com/zerodha/dungbeetle

go 1.20
go 1.21

require (
github.com/ClickHouse/clickhouse-go v1.5.4
github.com/RichardKnop/machinery v1.10.6
github.com/go-chi/chi/v5 v5.0.10
github.com/go-sql-driver/mysql v1.7.1
github.com/gofrs/uuid/v5 v5.0.0
github.com/gomodule/redigo v2.0.0+incompatible
github.com/kalbhor/tasqueue/v2 v2.1.0
github.com/knadh/goyesql/v2 v2.2.0
github.com/knadh/koanf/parsers/toml v0.1.0
github.com/knadh/koanf/providers/env v0.1.0
Expand All @@ -18,67 +17,39 @@ require (
github.com/lib/pq v1.10.9
github.com/spf13/pflag v1.0.5
github.com/stretchr/testify v1.8.4
github.com/vmihailenco/msgpack v4.0.4+incompatible
)

require (
cloud.google.com/go v0.81.0 // indirect
cloud.google.com/go/pubsub v1.10.3 // indirect
github.com/RichardKnop/logging v0.0.0-20190827224416-1a693bdd4fae // indirect
github.com/aws/aws-sdk-go v1.38.26 // indirect
github.com/bradfitz/gomemcache v0.0.0-20190913173617-a41fca850d0b // indirect
github.com/cespare/xxhash/v2 v2.1.1 // indirect
github.com/cespare/xxhash/v2 v2.1.2 // indirect
github.com/cloudflare/golz4 v0.0.0-20150217214814-ef862a3cdc58 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect
github.com/fsnotify/fsnotify v1.6.0 // indirect
github.com/go-redis/redis/v8 v8.8.2 // indirect
github.com/go-redsync/redsync/v4 v4.3.0 // indirect
github.com/go-stack/stack v1.8.0 // indirect
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect
github.com/golang/protobuf v1.5.2 // indirect
github.com/golang/snappy v0.0.3 // indirect
github.com/google/go-cmp v0.5.7 // indirect
github.com/google/uuid v1.2.0 // indirect
github.com/googleapis/gax-go/v2 v2.0.5 // indirect
github.com/hashicorp/errwrap v1.1.0 // indirect
github.com/hashicorp/go-multierror v1.1.1 // indirect
github.com/jmespath/go-jmespath v0.4.0 // indirect
github.com/jstemmer/go-junit-report v0.9.1 // indirect
github.com/kelseyhightower/envconfig v1.4.0 // indirect
github.com/klauspost/compress v1.12.1 // indirect
github.com/go-logr/logr v1.2.4 // indirect
github.com/go-logr/stdr v1.2.2 // indirect
github.com/go-redis/redis/v8 v8.11.5 // indirect
github.com/golang/protobuf v1.5.3 // indirect
github.com/google/uuid v1.3.1 // indirect
github.com/knadh/koanf/maps v0.1.1 // indirect
github.com/kr/pretty v0.1.0 // indirect
github.com/mitchellh/copystructure v1.2.0 // indirect
github.com/mitchellh/mapstructure v1.5.0 // indirect
github.com/mitchellh/reflectwalk v1.0.2 // indirect
github.com/opentracing/opentracing-go v1.2.0 // indirect
github.com/pelletier/go-toml v1.9.5 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/robfig/cron/v3 v3.0.1 // indirect
github.com/streadway/amqp v1.0.0 // indirect
github.com/xdg-go/pbkdf2 v1.0.0 // indirect
github.com/xdg-go/scram v1.0.2 // indirect
github.com/xdg-go/stringprep v1.0.2 // indirect
github.com/youmark/pkcs8 v0.0.0-20201027041543-1326539a0a0a // indirect
go.mongodb.org/mongo-driver v1.5.1 // indirect
go.opencensus.io v0.23.0 // indirect
go.opentelemetry.io/otel v0.20.0 // indirect
go.opentelemetry.io/otel/metric v0.20.0 // indirect
go.opentelemetry.io/otel/trace v0.20.0 // indirect
golang.org/x/crypto v0.0.0-20210921155107-089bfa567519 // indirect
golang.org/x/lint v0.0.0-20210508222113-6edffad5e616 // indirect
golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4 // indirect
golang.org/x/net v0.7.0 // indirect
golang.org/x/oauth2 v0.0.0-20210413134643-5e61552d6c78 // indirect
golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4 // indirect
golang.org/x/sys v0.11.0 // indirect
golang.org/x/text v0.7.0 // indirect
golang.org/x/tools v0.1.12 // indirect
google.golang.org/api v0.45.0 // indirect
google.golang.org/appengine v1.6.7 // indirect
google.golang.org/genproto v0.0.0-20210602131652-f16073e35f0c // indirect
google.golang.org/grpc v1.38.0 // indirect
google.golang.org/protobuf v1.26.0 // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
github.com/vmihailenco/msgpack/v5 v5.3.5 // indirect
github.com/vmihailenco/tagparser/v2 v2.0.0 // indirect
go.opentelemetry.io/otel v1.18.0 // indirect
go.opentelemetry.io/otel/metric v1.18.0 // indirect
go.opentelemetry.io/otel/sdk v1.18.0 // indirect
go.opentelemetry.io/otel/trace v1.18.0 // indirect
golang.org/x/net v0.10.0 // indirect
golang.org/x/sys v0.12.0 // indirect
golang.org/x/text v0.11.0 // indirect
google.golang.org/appengine v1.6.5 // indirect
google.golang.org/protobuf v1.31.0 // indirect
gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)
Loading

0 comments on commit 09e09f0

Please sign in to comment.