Skip to content

Commit

Permalink
[#70]: fix: NPE on third party payload send
Browse files Browse the repository at this point in the history
  • Loading branch information
rustatian committed Jul 20, 2023
2 parents ab00adb + 3e5a168 commit 80c58c6
Show file tree
Hide file tree
Showing 3 changed files with 30 additions and 7 deletions.
3 changes: 1 addition & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ go 1.20
require (
github.com/goccy/go-json v0.10.2
github.com/google/uuid v1.3.0
github.com/nats-io/nats.go v1.27.1
github.com/nats-io/nats.go v1.28.0
github.com/roadrunner-server/api/v4 v4.5.0
github.com/roadrunner-server/endure/v2 v2.3.0
github.com/roadrunner-server/errors v1.2.0
Expand All @@ -24,7 +24,6 @@ require (
github.com/nats-io/nats-server/v2 v2.7.4 // indirect
github.com/nats-io/nkeys v0.4.4 // indirect
github.com/nats-io/nuid v1.0.1 // indirect
github.com/stretchr/testify v1.8.4 // indirect
go.opentelemetry.io/otel/metric v1.16.0 // indirect
go.uber.org/atomic v1.11.0 // indirect
go.uber.org/multierr v1.11.0 // indirect
Expand Down
7 changes: 2 additions & 5 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ github.com/nats-io/nats-server/v2 v2.7.4 h1:c+BZJ3rGzUKCBIM4IXO8uNT2u1vajGbD1kPA
github.com/nats-io/nats-server/v2 v2.7.4/go.mod h1:1vZ2Nijh8tcyNe8BDVyTviCd9NYzRbubQYiEHsvOQWc=
github.com/nats-io/nats.go v1.27.1 h1:OuYnal9aKVSnOzLQIzf7554OXMCG7KbaTkCSBHRcSoo=
github.com/nats-io/nats.go v1.27.1/go.mod h1:XpbWUlOElGwTYbMR7imivs7jJj9GtK7ypv321Wp6pjc=
github.com/nats-io/nats.go v1.28.0 h1:Th4G6zdsz2d0OqXdfzKLClo6bOfoI/b1kInhRtFIy5c=
github.com/nats-io/nats.go v1.28.0/go.mod h1:XpbWUlOElGwTYbMR7imivs7jJj9GtK7ypv321Wp6pjc=
github.com/nats-io/nkeys v0.4.4 h1:xvBJ8d69TznjcQl9t6//Q5xXuVhyYiSos6RPtvQNTwA=
github.com/nats-io/nkeys v0.4.4/go.mod h1:XUkxdLPTufzlihbamfzQ7mw/VGx6ObUs+0bN5sNvt64=
github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw=
Expand All @@ -32,15 +34,11 @@ github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZb
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/roadrunner-server/api/v4 v4.5.0 h1:OUAcCwLeQbgRj7E2/6M6W2nxOnbG6XYPSS6LjW6COAQ=
github.com/roadrunner-server/api/v4 v4.5.0/go.mod h1:nzJvLrDMYT0K9hgPFmeL8dh6q2EvrJEaCHy2XRqz20c=
github.com/roadrunner-server/endure/v2 v2.2.1 h1:OkJUSd6+qqTcnl8in3bbyidEOmhO3B9uOVdR0avba28=
github.com/roadrunner-server/endure/v2 v2.2.1/go.mod h1:4eTAr3fASpdyqgFcbqVckOx68dZ4YPECecrcHvAuSdU=
github.com/roadrunner-server/endure/v2 v2.3.0 h1:ctsXL3BjcgHJ0kyO42B2QIaKeZa0modVV9jYx3qSxqo=
github.com/roadrunner-server/endure/v2 v2.3.0/go.mod h1:pvCyn4xADwnp8NSj0oWBCLSJaFSNjBQCAr0x+4a8Qq0=
github.com/roadrunner-server/errors v1.2.0 h1:qBmNXt8Iex9QnYTjCkbJKsBZu2EtYkQCM06GUDcQBbI=
github.com/roadrunner-server/errors v1.2.0/go.mod h1:z0ECxZp/dDa5RahtMcy4mBIavVxiZ9vwE5kByl7kFtY=
github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo=
github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk=
github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo=
go.opentelemetry.io/contrib/propagators/jaeger v1.17.0 h1:Zbpbmwav32Ea5jSotpmkWEl3a6Xvd4tw/3xxGO1i05Y=
go.opentelemetry.io/contrib/propagators/jaeger v1.17.0/go.mod h1:tcTUAlmO8nuInPDSBVfG+CP6Mzjy5+gNV4mPxMbL0IA=
go.opentelemetry.io/otel v1.16.0 h1:Z7GVAX/UkAXPKsy94IU+i6thsQS4nb7LviLpnaNeW8s=
Expand Down Expand Up @@ -69,4 +67,3 @@ google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQ
google.golang.org/protobuf v1.30.0 h1:kPPoIgf3TsEvrm0PFe15JQ+570QVxYzEvvHqChK+cng=
google.golang.org/protobuf v1.30.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
27 changes: 27 additions & 0 deletions natsjobs/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,18 +41,37 @@ func (c *Driver) listenerStart() { //nolint:gocognit
// only JS messages
meta, err := m.Metadata()
if err != nil {
errn := m.Nak()
if errn != nil {
c.log.Error("failed to send Nak state", zap.Error(errn), zap.Error(err))
continue
}
c.log.Info("can't get message metadata", zap.Error(err))
continue
}

err = m.InProgress()
if err != nil {
errn := m.Nak()
if errn != nil {
c.log.Error("failed to send Nak state", zap.Error(errn), zap.Error(err))
continue
}
c.log.Error("failed to send InProgress state", zap.Error(err))
continue
}

item := &Item{}
err = c.unpack(m.Data, item)
if err != nil {
errn := m.Term()
if errn != nil {
c.log.Error("failed to send Term state", zap.Error(errn), zap.Error(err))
continue
}
c.log.Error("unmarshal nats payload, if you're using non RR send, consider using the `consume_all: true` option, message terminated and won't be redelivered", zap.Error(err))
continue
}

// set queue and pipeline
item.Options.Queue = c.stream
Expand All @@ -69,6 +88,14 @@ func (c *Driver) listenerStart() { //nolint:gocognit
Value: attribute.StringValue(err.Error()),
})
span.End()
// NAK the message
errn := m.Term()
if errn != nil {
c.log.Error("failed to send Term state", zap.Error(errn), zap.Error(err))
continue
}

c.log.Debug("message terminated")
continue
}

Expand Down

0 comments on commit 80c58c6

Please sign in to comment.