Skip to content

Commit

Permalink
implement QueueSubscribe for poison messages
Browse files Browse the repository at this point in the history
  • Loading branch information
OrMemphis committed Aug 5, 2022
1 parent 273811d commit 45a6fa1
Show file tree
Hide file tree
Showing 4 changed files with 46 additions and 24 deletions.
10 changes: 7 additions & 3 deletions background_tasks/poison_messages_listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,14 @@ package background_tasks
import (
"memphis-broker/broker"
"memphis-broker/handlers"
"memphis-broker/server"
)

var poisonMessagesHandler = handlers.PoisonMessagesHandler{}
var poisonMessagesHandler handlers.PoisonMessagesHandler

func ListenForPoisonMessages() {
broker.QueueSubscribe("$JS.EVENT.ADVISORY.CONSUMER.MAX_DELIVERIES.>", "$memphis_poison_messages_listeners_group", poisonMessagesHandler.HandleNewMessage)
func ListenForPoisonMessages(s *server.Server) {
broker.QueueSubscribe(s,
"$JS.EVENT.ADVISORY.CONSUMER.MAX_DELIVERIES.>",
"$memphis_poison_messages_listeners_group",
poisonMessagesHandler.HandleNewMessage)
}
39 changes: 21 additions & 18 deletions broker/broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -395,6 +395,7 @@ func ValidateUserCreds(token string) error {
return notImplemented()
}

// TODO (or) remove unused function
func CreateInternalStream(s *server.Server, name string) error {
return s.MemphisAddStream(&server.StreamConfig{
Name: name,
Expand All @@ -408,26 +409,28 @@ func CreateInternalStream(s *server.Server, name string) error {
})
}

func PublishMessageToSubject(subject string, msg []byte) error {
// _, err := js.Publish(subject, msg)
// if err != nil {
// return getErrorWithoutNats(err)
// }
// return nil
return notImplemented()
}
// func PublishMessageToSubject(subject string, msg []byte) error {
// // _, err := js.Publish(subject, msg)
// // if err != nil {
// // return getErrorWithoutNats(err)
// // }
// // return nil
// return notImplemented()
// }

func CreatePullSubscriber(stream string, durable string) (*nats.Subscription, error) {
// sub, err := js.PullSubscribe(stream, durable)
// if err != nil {
// return sub, getErrorWithoutNats(err)
// }
// return sub, nil
return nil, notImplemented()
}
// func CreatePullSubscriber(stream string, durable string) (*nats.Subscription, error) {
// // sub, err := js.PullSubscribe(stream, durable)
// // if err != nil {
// // return sub, getErrorWithoutNats(err)
// // }
// // return sub, nil
// return nil, notImplemented()
// }

type MessageHandler func(subject string, msg []byte)

func QueueSubscribe(subject, queue_group_name string, cb func(msg *nats.Msg)) {
// broker.QueueSubscribe(subject, queue_group_name, cb)
func QueueSubscribe(s *server.Server, subject, queueGroupName string, cb MessageHandler) {
s.MemphisQueueSubscribeInternal(subject, queueGroupName, cb)
}

func IsConnectionAlive() bool {
Expand Down
5 changes: 2 additions & 3 deletions handlers/poison_messages.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (
"context"
"time"

"github.com/nats-io/nats.go"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/bson/primitive"
"go.mongodb.org/mongo-driver/mongo"
Expand All @@ -31,9 +30,9 @@ import (

type PoisonMessagesHandler struct{ S *server.Server }

func (pmh PoisonMessagesHandler) HandleNewMessage(msg *nats.Msg) {
func (pmh PoisonMessagesHandler) HandleNewMessage(subject string, msg []byte) {
var message map[string]interface{}
err := json.Unmarshal(msg.Data, &message)
err := json.Unmarshal(msg, &message)
if err != nil {
serv.Errorf("Error while getting notified about a poison message: " + err.Error())
return
Expand Down
16 changes: 16 additions & 0 deletions server/memphis_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,3 +175,19 @@ func (s *Server) MemphisGetSingleMsg(streamName string, msgSeq uint64) (*StoredM
}
return stream.getMsg(msgSeq)
}

// func (s *Server) MemphisPublishInternalMsg(subj string, msg []byte) {
// s.sendInternalMsgLocked(subj, _EMPTY_, nil, msg)
// }

func (s *Server) MemphisQueueSubscribeInternal(subj string, queueGroupName string, cb func(string, []byte)) error {
acc := s.GlobalAccount()
c := acc.ic
wcb := func(_ *subscription, _ *client, _ *Account, subject, _ string, rmsg []byte) {
cb(subject, rmsg)
}

_, err := c.processSub([]byte(subj), []byte(queueGroupName), []byte("memphis_internal"), wcb, false)

return err
}

0 comments on commit 45a6fa1

Please sign in to comment.