Skip to content

Commit

Permalink
feature: JOBS v2 API support
Browse files Browse the repository at this point in the history
Signed-off-by: Valery Piashchynski <piashchynski.valery@gmail.com>
  • Loading branch information
rustatian committed Jun 20, 2023
1 parent 3381fab commit 9643105
Show file tree
Hide file tree
Showing 7 changed files with 107 additions and 60 deletions.
18 changes: 12 additions & 6 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,29 +5,35 @@ go 1.20
require (
github.com/goccy/go-json v0.10.2
github.com/google/uuid v1.3.0
github.com/nats-io/nats.go v1.26.0
github.com/nats-io/nats.go v1.27.0
github.com/roadrunner-server/api/v4 v4.3.2
github.com/roadrunner-server/endure/v2 v2.2.1
github.com/roadrunner-server/errors v1.2.0
github.com/roadrunner-server/sdk/v4 v4.2.6
go.opentelemetry.io/contrib/propagators/jaeger v1.17.0
go.opentelemetry.io/otel v1.16.0
go.opentelemetry.io/otel/sdk v1.16.0
go.opentelemetry.io/otel/trace v1.16.0
go.uber.org/zap v1.24.0
)

replace (
github.com/roadrunner-server/api/v4 => ../../api
github.com/roadrunner-server/sdk/v4 => ../../sdk
)

require (
github.com/go-logr/logr v1.2.4 // indirect
github.com/go-logr/stdr v1.2.2 // indirect
github.com/klauspost/compress v1.16.5 // indirect
github.com/golang/protobuf v1.5.3 // indirect
github.com/klauspost/compress v1.16.6 // indirect
github.com/nats-io/nats-server/v2 v2.7.4 // indirect
github.com/nats-io/nkeys v0.4.4 // indirect
github.com/nats-io/nuid v1.0.1 // indirect
github.com/roadrunner-server/tcplisten v1.3.0 // indirect
github.com/stretchr/testify v1.8.4 // indirect
go.opentelemetry.io/otel/metric v1.16.0 // indirect
go.uber.org/atomic v1.11.0 // indirect
go.uber.org/multierr v1.11.0 // indirect
golang.org/x/crypto v0.9.0 // indirect
golang.org/x/sys v0.8.0 // indirect
golang.org/x/crypto v0.10.0 // indirect
golang.org/x/sys v0.9.0 // indirect
google.golang.org/protobuf v1.30.0 // indirect
)
34 changes: 19 additions & 15 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -8,36 +8,35 @@ github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag=
github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE=
github.com/goccy/go-json v0.10.2 h1:CrxCmQqYDkv1z7lO7Wbh2HN93uovUHgrECaO5ZrCXAU=
github.com/goccy/go-json v0.10.2/go.mod h1:6MelG93GURQebXPDq3khkgXZkazVtN9CRI+MGFi0w8I=
github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk=
github.com/golang/protobuf v1.5.3 h1:KhyjKVUg7Usr/dYsdSqoFveMYd5ko72D+zANwlG1mmg=
github.com/golang/protobuf v1.5.3/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY=
github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38=
github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I=
github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/klauspost/compress v1.16.5 h1:IFV2oUNUzZaz+XyusxpLzpzS8Pt5rh0Z16For/djlyI=
github.com/klauspost/compress v1.16.5/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE=
github.com/klauspost/compress v1.16.6 h1:91SKEy4K37vkp255cJ8QesJhjyRO0hn9i9G0GoUwLsk=
github.com/klauspost/compress v1.16.6/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE=
github.com/minio/highwayhash v1.0.2 h1:Aak5U0nElisjDCfPSG79Tgzkn2gl66NxOMspRrKnA/g=
github.com/nats-io/jwt/v2 v2.2.1-0.20220113022732-58e87895b296 h1:vU9tpM3apjYlLLeY23zRWJ9Zktr5jp+mloR942LEOpY=
github.com/nats-io/nats-server/v2 v2.7.4 h1:c+BZJ3rGzUKCBIM4IXO8uNT2u1vajGbD1kPA6wqCEaM=
github.com/nats-io/nats-server/v2 v2.7.4/go.mod h1:1vZ2Nijh8tcyNe8BDVyTviCd9NYzRbubQYiEHsvOQWc=
github.com/nats-io/nats.go v1.26.0 h1:fWJTYPnZ8DzxIaqIHOAMfColuznchnd5Ab5dbJpgPIE=
github.com/nats-io/nats.go v1.26.0/go.mod h1:XpbWUlOElGwTYbMR7imivs7jJj9GtK7ypv321Wp6pjc=
github.com/nats-io/nats.go v1.27.0 h1:3o9fsPhmoKm+yK7rekH2GtWoE+D9jFbw8N3/ayI1C00=
github.com/nats-io/nats.go v1.27.0/go.mod h1:XpbWUlOElGwTYbMR7imivs7jJj9GtK7ypv321Wp6pjc=
github.com/nats-io/nkeys v0.4.4 h1:xvBJ8d69TznjcQl9t6//Q5xXuVhyYiSos6RPtvQNTwA=
github.com/nats-io/nkeys v0.4.4/go.mod h1:XUkxdLPTufzlihbamfzQ7mw/VGx6ObUs+0bN5sNvt64=
github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw=
github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c=
github.com/pkg/errors v0.8.1 h1:iURUrRGxPUNPdy5/HRSm+Yj6okJ6UtLINN0Q9M4+h3I=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/roadrunner-server/api/v4 v4.3.2 h1:1zMfd2P+i9hTDFIsp9nEhCLNe+GmUrUqgnON8ZUO6Mc=
github.com/roadrunner-server/api/v4 v4.3.2/go.mod h1:HFb1kQ/H5UkD7MBNqi4L7hXQTtc919FcO8JKPqoSVzs=
github.com/roadrunner-server/endure/v2 v2.2.1 h1:OkJUSd6+qqTcnl8in3bbyidEOmhO3B9uOVdR0avba28=
github.com/roadrunner-server/endure/v2 v2.2.1/go.mod h1:4eTAr3fASpdyqgFcbqVckOx68dZ4YPECecrcHvAuSdU=
github.com/roadrunner-server/errors v1.2.0 h1:qBmNXt8Iex9QnYTjCkbJKsBZu2EtYkQCM06GUDcQBbI=
github.com/roadrunner-server/errors v1.2.0/go.mod h1:z0ECxZp/dDa5RahtMcy4mBIavVxiZ9vwE5kByl7kFtY=
github.com/roadrunner-server/sdk/v4 v4.2.6 h1:BSQ+HHklszJKGCo91jqRwvgjhSkuz097cbPHMFAhIfo=
github.com/roadrunner-server/sdk/v4 v4.2.6/go.mod h1:WBLEsz9EMY6CkwpdeageMEPLevD/PaUf4rOOsBsaKlo=
github.com/roadrunner-server/tcplisten v1.3.0 h1:VDd6IbP8oIjm5vKvMVozeZgeHgOcoP0XYLOyOqcZHCY=
github.com/roadrunner-server/tcplisten v1.3.0/go.mod h1:VR6Ob5am0oEuLMOeLiVvQxG9ShykAEgrlvZddX8EfoU=
github.com/stretchr/testify v1.8.3 h1:RP3t2pwF7cMEbC1dqtB6poj3niw/9gnV4Cjg5oW5gtY=
github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo=
github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk=
github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo=
go.opentelemetry.io/contrib/propagators/jaeger v1.17.0 h1:Zbpbmwav32Ea5jSotpmkWEl3a6Xvd4tw/3xxGO1i05Y=
go.opentelemetry.io/contrib/propagators/jaeger v1.17.0/go.mod h1:tcTUAlmO8nuInPDSBVfG+CP6Mzjy5+gNV4mPxMbL0IA=
go.opentelemetry.io/otel v1.16.0 h1:Z7GVAX/UkAXPKsy94IU+i6thsQS4nb7LviLpnaNeW8s=
Expand All @@ -55,10 +54,15 @@ go.uber.org/multierr v1.11.0 h1:blXXJkSxSSfBVBlC76pxqeO+LN3aDfLQo+309xJstO0=
go.uber.org/multierr v1.11.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y=
go.uber.org/zap v1.24.0 h1:FiJd5l1UOLj0wCgbSE0rwwXHzEdAZS6hiiSnxJN/D60=
go.uber.org/zap v1.24.0/go.mod h1:2kMP+WWQ8aoFoedH3T2sq6iJ2yDWpHbP0f6MQbS9Gkg=
golang.org/x/crypto v0.9.0 h1:LF6fAI+IutBocDJ2OT0Q1g8plpYljMZ4+lty+dsqw3g=
golang.org/x/crypto v0.9.0/go.mod h1:yrmDGqONDYtNj3tH8X9dzUun2m2lzPa9ngI6/RUPGR0=
golang.org/x/sys v0.8.0 h1:EBmGv8NaZBZTWvrbjNoL6HVt+IVy3QDQpJs7VRIw3tU=
golang.org/x/sys v0.8.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/crypto v0.10.0 h1:LKqV2xt9+kDzSTfOhx4FrkEBcMrAgHSYgzywV9zcGmM=
golang.org/x/crypto v0.10.0/go.mod h1:o4eNf7Ede1fv+hwOwZsTHl9EsPFO6q6ZvYR8vYfY45I=
golang.org/x/sys v0.9.0 h1:KS/R3tvhPqvJvwcKfnBHJwwthS11LRhmM5D59eEXa0s=
golang.org/x/sys v0.9.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/time v0.0.0-20211116232009-f0f3c7e86c11 h1:GZokNIeuVkl3aZHJchRrr13WCsols02MLUcz1U9is6M=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw=
google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc=
google.golang.org/protobuf v1.30.0 h1:kPPoIgf3TsEvrm0PFe15JQ+570QVxYzEvvHqChK+cng=
google.golang.org/protobuf v1.30.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
48 changes: 27 additions & 21 deletions natsjobs/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,7 @@ import (

"github.com/goccy/go-json"
"github.com/nats-io/nats.go"
"github.com/roadrunner-server/api/v4/plugins/v1/jobs"
pq "github.com/roadrunner-server/api/v4/plugins/v1/priority_queue"
"github.com/roadrunner-server/api/v4/plugins/v2/jobs"
"github.com/roadrunner-server/errors"
jprop "go.opentelemetry.io/contrib/propagators/jaeger"
"go.opentelemetry.io/otel"
Expand Down Expand Up @@ -37,13 +36,14 @@ type Configurer interface {
type Driver struct {
// system
log *zap.Logger
queue pq.Queue
queue jobs.Queue
tracer *sdktrace.TracerProvider
prop propagation.TextMapPropagator
listeners uint32
pipeline atomic.Pointer[jobs.Pipeline]
consumeAll bool
stopCh chan struct{}
stopped uint64

// nats
conn *nats.Conn
Expand All @@ -62,7 +62,7 @@ type Driver struct {
deleteStreamOnStop bool
}

func FromConfig(tracer *sdktrace.TracerProvider, configKey string, log *zap.Logger, cfg Configurer, pipe jobs.Pipeline, pq pq.Queue) (*Driver, error) {
func FromConfig(tracer *sdktrace.TracerProvider, configKey string, log *zap.Logger, cfg Configurer, pipe jobs.Pipeline, pq jobs.Queue) (*Driver, error) {
const op = errors.Op("new_nats_consumer")

if !cfg.Has(configKey) {
Expand Down Expand Up @@ -134,11 +134,12 @@ func FromConfig(tracer *sdktrace.TracerProvider, configKey string, log *zap.Logg
}

cs := &Driver{
tracer: tracer,
prop: prop,
log: log,
stopCh: make(chan struct{}),
queue: pq,
tracer: tracer,
prop: prop,
log: log,
stopCh: make(chan struct{}),
stopped: 0,
queue: pq,

conn: conn,
js: js,
Expand All @@ -159,7 +160,7 @@ func FromConfig(tracer *sdktrace.TracerProvider, configKey string, log *zap.Logg
return cs, nil
}

func FromPipeline(tracer *sdktrace.TracerProvider, pipe jobs.Pipeline, log *zap.Logger, cfg Configurer, pq pq.Queue) (*Driver, error) {
func FromPipeline(tracer *sdktrace.TracerProvider, pipe jobs.Pipeline, log *zap.Logger, cfg Configurer, pq jobs.Queue) (*Driver, error) {
const op = errors.Op("new_nats_pipeline_consumer")

// if no global section -- error
Expand Down Expand Up @@ -222,11 +223,12 @@ func FromPipeline(tracer *sdktrace.TracerProvider, pipe jobs.Pipeline, log *zap.
}

cs := &Driver{
tracer: tracer,
prop: prop,
log: log,
queue: pq,
stopCh: make(chan struct{}),
tracer: tracer,
prop: prop,
log: log,
queue: pq,
stopCh: make(chan struct{}),
stopped: 0,

conn: conn,
js: js,
Expand All @@ -247,7 +249,7 @@ func FromPipeline(tracer *sdktrace.TracerProvider, pipe jobs.Pipeline, log *zap.
return cs, nil
}

func (c *Driver) Push(ctx context.Context, job jobs.Job) error {
func (c *Driver) Push(ctx context.Context, job jobs.Message) error {
const op = errors.Op("nats_consumer_push")

ctx, span := trace.SpanFromContext(ctx).TracerProvider().Tracer(tracerName).Start(ctx, "nats_push")
Expand All @@ -258,7 +260,7 @@ func (c *Driver) Push(ctx context.Context, job jobs.Job) error {
}

j := fromJob(job)
c.prop.Inject(ctx, propagation.HeaderCarrier(j.Headers))
c.prop.Inject(ctx, propagation.HeaderCarrier(j.headers))

data, err := json.Marshal(j)
if err != nil {
Expand Down Expand Up @@ -407,6 +409,11 @@ func (c *Driver) Stop(ctx context.Context) error {
_, span := trace.SpanFromContext(ctx).TracerProvider().Tracer(tracerName).Start(ctx, "nats_stop")
defer span.End()

atomic.StoreUint64(&c.stopped, 1)
pipe := *c.pipeline.Load()
// remove all items associated with the current pipeline
_ = c.queue.Remove(pipe.Name())

if atomic.LoadUint32(&c.listeners) > 0 {
if c.sub != nil {
err := c.sub.Drain()
Expand All @@ -425,7 +432,6 @@ func (c *Driver) Stop(ctx context.Context) error {
}
}

pipe := *c.pipeline.Load()
err := c.conn.Drain()
if err != nil {
return err
Expand Down Expand Up @@ -483,15 +489,15 @@ func ready(r uint32) bool {
return r > 0
}

func fromJob(job jobs.Job) *Item {
func fromJob(job jobs.Message) *Item {
return &Item{
Job: job.Name(),
Ident: job.ID(),
Payload: job.Payload(),
Headers: job.Headers(),
headers: job.Headers(),
Options: &Options{
Priority: job.Priority(),
Pipeline: job.Pipeline(),
Pipeline: job.PipelineID(),
Delay: job.Delay(),
AutoAck: job.AutoAck(),
},
Expand Down
46 changes: 37 additions & 9 deletions natsjobs/item.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,13 @@ package natsjobs

import (
stderr "errors"
"sync/atomic"
"time"
"unsafe"

"github.com/goccy/go-json"
"github.com/nats-io/nats.go"
"github.com/roadrunner-server/sdk/v4/utils"
"github.com/roadrunner-server/errors"
)

type Item struct {
Expand All @@ -17,7 +19,7 @@ type Item struct {
// Payload is string data (usually JSON) passed to Job broker.
Payload string `json:"payload"`
// Headers with key-values pairs
Headers map[string][]string `json:"headers"`
headers map[string][]string
// Options contains set of PipelineOptions specific to job execution. Can be empty.
Options *Options `json:"options,omitempty"`
}
Expand All @@ -37,6 +39,7 @@ type Options struct {

// private
deleteAfterAck bool
stopped *uint64
requeueFn func(*Item) error
ack func(...nats.AckOpt) error
nak func(...nats.AckOpt) error
Expand All @@ -58,13 +61,17 @@ func (i *Item) Priority() int64 {
return i.Options.Priority
}

func (i *Item) Metadata() map[string][]string {
return i.Headers
func (i *Item) PipelineID() string {
return i.Options.Pipeline
}

func (i *Item) Headers() map[string][]string {
return i.headers
}

// Body packs job payload into binary payload.
func (i *Item) Body() []byte {
return utils.AsBytes(i.Payload)
return strToBytes(i.Payload)
}

// Context packs job context (job, id) into binary payload.
Expand All @@ -81,7 +88,7 @@ func (i *Item) Context() ([]byte, error) {
ID: i.Ident,
Job: i.Job,
Driver: pluginName,
Headers: i.Headers,
Headers: i.headers,
Queue: i.Options.Queue,
Pipeline: i.Options.Pipeline,
},
Expand All @@ -95,6 +102,9 @@ func (i *Item) Context() ([]byte, error) {
}

func (i *Item) Ack() error {
if atomic.LoadUint64(i.Options.stopped) == 1 {
return errors.Str("failed to acknowledge the JOB, the pipeline is probably stopped")
}
// the message already acknowledged
if i.Options.AutoAck {
return nil
Expand All @@ -116,15 +126,21 @@ func (i *Item) Ack() error {
}

func (i *Item) Nack() error {
if atomic.LoadUint64(i.Options.stopped) == 1 {
return errors.Str("failed to acknowledge the JOB, the pipeline is probably stopped")
}
if i.Options.AutoAck {
return nil
}
return i.Options.nak()
}

func (i *Item) Requeue(headers map[string][]string, _ int64) error {
if atomic.LoadUint64(i.Options.stopped) == 1 {
return errors.Str("failed to acknowledge the JOB, the pipeline is probably stopped")
}
// overwrite the delay
i.Headers = headers
i.headers = headers

err := i.Options.requeueFn(i)
if err != nil {
Expand Down Expand Up @@ -159,6 +175,18 @@ func (i *Item) Requeue(headers map[string][]string, _ int64) error {
return nil
}

func (i *Item) Respond(_ []byte, _ string) error {
return nil
func bytesToStr(data []byte) string {
if len(data) == 0 {
return ""
}

return unsafe.String(unsafe.SliceData(data), len(data))
}

func strToBytes(data string) []byte {
if data == "" {
return nil
}

return unsafe.Slice(unsafe.StringData(data), len(data))
}
9 changes: 7 additions & 2 deletions natsjobs/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,9 @@ func (c *Driver) listenerStart() { //nolint:gocognit
// set queue and pipeline
item.Options.Queue = c.stream
item.Options.Pipeline = (*c.pipeline.Load()).Name()
item.Options.stopped = &c.stopped

ctx := c.prop.Extract(context.Background(), propagation.HeaderCarrier(item.Headers))
ctx := c.prop.Extract(context.Background(), propagation.HeaderCarrier(item.headers))
ctx, span := c.tracer.Tracer(tracerName).Start(ctx, "nats_listener")

if err != nil {
Expand Down Expand Up @@ -121,7 +122,11 @@ func (c *Driver) listenerStart() { //nolint:gocognit
item.Options.nak = nil
}

c.prop.Inject(ctx, propagation.HeaderCarrier(item.Headers))
if item.headers == nil {
item.headers = make(map[string][]string, 1)
}

c.prop.Inject(ctx, propagation.HeaderCarrier(item.headers))
c.queue.Insert(item)
span.End()
case <-c.stopCh:
Expand Down
Loading

0 comments on commit 9643105

Please sign in to comment.