Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add nats queue,support default and jetstream mode #64

Merged
merged 3 commits into from
Jul 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions dq/producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,11 @@ type (
}
)

var rng *rand.Rand

func init() {
rand.Seed(time.Now().UnixNano())
source := rand.NewSource(time.Now().UnixNano())
rng = rand.New(source)
}

func NewProducer(beanstalks []Beanstalk) Producer {
Expand Down Expand Up @@ -117,7 +120,7 @@ func (p *producerCluster) getWriteNodes() []Producer {
}

nodes := p.cloneNodes()
rand.Shuffle(len(nodes), func(i, j int) {
rng.Shuffle(len(nodes), func(i, j int) {
nodes[i], nodes[j] = nodes[j], nodes[i]
})
return nodes[:replicaNodes]
Expand Down
60 changes: 60 additions & 0 deletions example/natsq/consumer/consumer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
package main

import (
"fmt"

"github.com/zeromicro/go-queue/natsq"
)

type MyConsumer struct {
Channel string
}

func (c *MyConsumer) HandleMessage(m *natsq.Msg) error {
fmt.Printf("%s Received %s's a message: %s\n", c.Channel, m.Subject, string(m.Data))
return nil
}

func main() {

mc1 := &MyConsumer{Channel: "vipUpgrade"}
mc2 := &MyConsumer{Channel: "taskFinish"}

c := &natsq.NatsConfig{
ServerUri: "nats://127.0.0.1:4222",
}

//JetMode
// cq := []*natsq.ConsumerQueue{
// {
// Consumer: mc1,
// QueueName: "vipUpgrade",
// StreamName: "ccc",
// Subjects: []string{"ddd", "eee"},
// },
// {
// Consumer: mc2,
// QueueName: "taskFinish",
// StreamName: "ccc",
// Subjects: []string{"ccc", "eee"},
// },
// }
//q := natsq.MustNewConsumerManager(c, cq, natsq.NatJetMode)

//DefaultMode
cq := []*natsq.ConsumerQueue{
{
Consumer: mc1,
QueueName: "vipUpgrade",
Subjects: []string{"ddd", "eee"},
},
{
Consumer: mc2,
QueueName: "taskFinish",
Subjects: []string{"ccc", "eee"},
},
}
q := natsq.MustNewConsumerManager(c, cq, natsq.NatDefaultMode)
q.Start()
defer q.Stop()
}
68 changes: 68 additions & 0 deletions example/natsq/publisher/publisher.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
package main

import (
"log"
"math/rand"
"time"

"github.com/nats-io/nats.go/jetstream"
"github.com/zeromicro/go-queue/natsq"
)

func main() {

c := natsq.NatsConfig{
ServerUri: "127.0.0.1:4222",
}

// Default Mode
p, _ := natsq.NewDefaultProducer(&c)
for i := 0; i < 3; i++ {
payload := randBody()
err := p.Publish(randSub(), payload)
if err != nil {
log.Fatalf("Error publishing message: %v", err)
} else {
log.Printf("Published message: %s", string(payload))
}
}
p.Close()

// JetMode
j, _ := natsq.NewJetProducer(&c)
j.CreateOrUpdateStream(jetstream.StreamConfig{
Name: "ccc",
Subjects: []string{"ccc", "ddd", "eee"},
Storage: jetstream.FileStorage,
NoAck: false,
})
for i := 0; i < 3; i++ {
payload := randBody()
err := j.Publish(randSub(), payload)
if err != nil {
log.Fatalf("Error publishing message: %v", err)
} else {
log.Printf("Published message: %s", string(payload))
}
}
j.Close()
}

func randSub() string {
source := rand.NewSource(time.Now().UnixNano())
// 创建一个新的随机数生成器
rng := rand.New(source)
strings := []string{"ccc", "ddd", "eee"}
randomIndex := rng.Intn(len(strings))
return strings[randomIndex]
}

func randBody() []byte {
charSet := "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789"
length := 10
result := make([]byte, length)
for i := range result {
result[i] = charSet[rand.Intn(len(charSet))]
}
return result
}
5 changes: 3 additions & 2 deletions example/stan/publisher/producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,12 +44,13 @@ func main() {
}

func randSub() string {
rand.Seed(time.Now().UnixNano())
source := rand.NewSource(time.Now().UnixNano())
rng := rand.New(source)
charSet := "abc"
length := 1
result := make([]byte, length)
for i := range result {
result[i] = charSet[rand.Intn(len(charSet))]
result[i] = charSet[rng.Intn(len(charSet))]
}
return string(result)
}
Expand Down
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
module github.com/zeromicro/go-queue

go 1.19
go 1.20

require (
github.com/beanstalkd/go-beanstalk v0.2.0
github.com/nats-io/nats.go v1.34.1
github.com/nats-io/stan.go v0.10.4
github.com/rabbitmq/amqp091-go v1.9.0
github.com/segmentio/kafka-go v0.4.38
Expand All @@ -27,7 +28,6 @@ require (
github.com/matttproud/golang_protobuf_extensions/v2 v2.0.0 // indirect
github.com/nats-io/nats-server/v2 v2.9.15 // indirect
github.com/nats-io/nats-streaming-server v0.25.3 // indirect
github.com/nats-io/nats.go v1.34.1 // indirect
github.com/nats-io/nkeys v0.4.7 // indirect
github.com/nats-io/nuid v1.0.1 // indirect
github.com/openzipkin/zipkin-go v0.4.2 // indirect
Expand Down
11 changes: 11 additions & 0 deletions natsq/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package natsq

import (
"github.com/nats-io/nats.go"
)

type NatsConfig struct {
ServerUri string
ClientName string
Options []nats.Option
}
174 changes: 174 additions & 0 deletions natsq/consumer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,174 @@
package natsq

import (
"context"
"errors"
"log"
"sync"

"github.com/nats-io/nats.go"
"github.com/nats-io/nats.go/jetstream"
"github.com/zeromicro/go-zero/core/logx"
"github.com/zeromicro/go-zero/core/queue"
)

const (
NatDefaultMode = iota
NatJetMode
)

type (
Msg struct {
Subject string
Data []byte
}

ConsumeHandle func(m *Msg) error

// ConsumeHandler Consumer interface, used to define the methods required by the consumer
ConsumeHandler interface {
HandleMessage(m *Msg) error
}

// ConsumerQueue Consumer queue, used to maintain the relationship between a consumer queue
ConsumerQueue struct {
StreamName string // stream name
QueueName string // queue name
Subjects []string // Subscribe subject
Consumer ConsumeHandler // consumer object
JetOption []jetstream.PullConsumeOpt // Jetstream configuration
}

// ConsumerManager Consumer manager for managing multiple consumer queues
ConsumerManager struct {
mutex sync.RWMutex // read-write lock
conn *nats.Conn // nats connect
mode uint // nats mode
queues []ConsumerQueue // consumer queue list
options []nats.Option // Connection configuration items
doneChan chan struct{} // close channel
}
)

// MustNewConsumerManager creates a new ConsumerManager instance.
// It connects to NATS server, registers the provided consumer queues, and returns the ConsumerManager.
// If any error occurs during the process, it logs the error and continues.
func MustNewConsumerManager(cfg *NatsConfig, cq []*ConsumerQueue, mode uint) queue.MessageQueue {
sc, err := nats.Connect(cfg.ServerUri, cfg.Options...)
if err != nil {
logx.Errorf("failed to connect nats, error: %v", err)
}
cm := &ConsumerManager{
conn: sc,
options: cfg.Options,
mode: mode,
doneChan: make(chan struct{}),
}
if len(cq) == 0 {
logx.Errorf("failed consumerQueue register to nats, error: cq len is 0")
}
for _, item := range cq {
err = cm.registerQueue(item)
if err != nil {
logx.Errorf("failed to register nats, error: %v", err)
}
}

return cm
}

// Start starts consuming messages from all the registered consumer queues.
// It launches a goroutine for each consumer queue to subscribe and process messages.
// The method blocks until the doneChan is closed.
func (cm *ConsumerManager) Start() {
cm.mutex.RLock()
defer cm.mutex.RUnlock()

if len(cm.queues) == 0 {
logx.Errorf("no consumer queues found")
}
for _, consumerQueue := range cm.queues {
go cm.subscribe(consumerQueue)
}
<-cm.doneChan
}

// Stop closes the NATS connection and stops the ConsumerManager.
func (cm *ConsumerManager) Stop() {
if cm.conn != nil {
cm.conn.Close()
}
}

// registerQueue registers a new consumer queue with the ConsumerManager.
// It validates the required fields of the ConsumerQueue and adds it to the list of queues.
// If any required field is missing, it returns an error.
func (cm *ConsumerManager) registerQueue(queue *ConsumerQueue) error {
cm.mutex.Lock()
defer cm.mutex.Unlock()

if cm.mode == NatJetMode && queue.StreamName == "" {
return errors.New("stream name is required")
}

if queue.QueueName == "" {
return errors.New("queue name is required")
}
if len(queue.Subjects) == 0 {
return errors.New("subject is required")
}
if queue.Consumer == nil {
return errors.New("consumer is required")
}

cm.queues = append(cm.queues, *queue)
return nil
}

// subscribe subscribes to the specified consumer queue and starts processing messages.
// If the NATS mode is NatJetMode, it creates a JetStream consumer and consumes messages using the provided options.
// If the NATS mode is NatDefaultMode, it subscribes to the specified subjects using the queue name.
// The method blocks until the doneChan is closed.
func (cm *ConsumerManager) subscribe(queue ConsumerQueue) {
ctx := context.Background()
if cm.mode == NatJetMode {
js, _ := jetstream.New(cm.conn)
stream, err := js.Stream(ctx, "ccc")
if err != nil {
log.Fatalf("Error creating stream: %v", err)
return
}
consumer, _ := stream.CreateOrUpdateConsumer(ctx, jetstream.ConsumerConfig{
Name: queue.QueueName,
AckPolicy: jetstream.AckExplicitPolicy,
FilterSubjects: queue.Subjects,
})
consContext, subErr := consumer.Consume(func(msg jetstream.Msg) {
err := queue.Consumer.HandleMessage(&Msg{Subject: msg.Subject(), Data: msg.Data()})
if err != nil {
logx.Errorf("error handling message: %v", err.Error())
} else {
msg.Ack()
}
}, queue.JetOption...)
if subErr != nil {
logx.Errorf("error subscribing to queue %s: %v", queue.QueueName, subErr.Error())
return
}
defer consContext.Stop()
}
if cm.mode == NatDefaultMode {
for _, subject := range queue.Subjects {
cm.conn.QueueSubscribe(subject, queue.QueueName, func(m *nats.Msg) {
err := queue.Consumer.HandleMessage(&Msg{Subject: m.Subject, Data: m.Data})
if err != nil {
logx.Errorf("error handling message: %v", err.Error())
} else {
m.Ack()
}
})
}
}

<-cm.doneChan
}
Loading
Loading