Queue library wrapper for widely popular queues - RabitMQ, Kafka, NSQ, NATS
Switch branches/tags
Nothing to show
Clone or download
Fetching latest commit…
Cannot retrieve the latest commit at this time.
Permalink
Failed to load latest commit information.
benchmark
example
LICENSE
README.md
kafka.go
kafka_test.go
nats.go
nats_test.go
nsq.go
nsq_test.go
queue.go
rabbitmq.go
rabbitmq_test.go

README.md

AnyQ

GoDoc

This is queue library wrapper for widely popular queues. AnyQ provide one way to handle various queues.

Supporting Queues

Basic usage

Go get:

$ go get -u github.com/jaehue/anyq

Import the package:

import (
	"github.com/jaehue/anyq"
)

Create new queue:

q, _ := anyq.New("nsq", "127.0.0.1:4150")

Create consumer:

c, _ := q.NewConsumer(anyq.NsqConsumerArgs{Topic: "test", Channel: "anyq"})

Create producer:

p, _ := q.NewProducer(anyq.NsqProducerArgs{Topic: "test"})

Consume:

recvCh := make(chan *anyq.Message, 256)
c.BindRecvChan(recvCh)
for m := range recvCh {
	fmt.Println("[receive]", string(m.Body))
}

Produce:

sendCh := make(chan []byte, 1)
p.BindSendChan(sendCh)
sendCh <- []byte("test message")

Close:

q.Close()

Advanced usage

Optional setup function

set QoS and Exchange of RabbitMQ

setQos := func(q *anyq.Rabbitmq) {
	if err := q.Qos(100, 0, false); err != nil {
		log.Fatal(err)
	}
}

setExchange := func(q *anyq.Rabbitmq) {
	if err := q.ExchangeDeclare("test-ex", "direct", false, false, false, false, nil); err != nil {
		log.Fatal(err)
	}
	log.Println("declared Exchange")
}

q, err := anyq.New("rabbitmq", "amqp://guest:guest@127.0.0.1:5672/", setQos, setExchange)

set zookeeper urls of Kafka

q, err := anyq.New("kafka", "localhost:9092", func(q *anyq.Kafka) {
	q.Zookeepers = []string{"localhost:2181", "localhost:2182"}
})

Retrieve original conn object

q, err := anyq.New("nats", "nats://127.0.0.1:4222")
if err != nil {
	panic(err)
}

conn, err := q.Conn()
if err != nil {
	b.Error(err)
}
natsConn, ok := conn.(*nats.Conn)
if !ok {
	log.Fatalf("invalid conn type(%T)\n", conn)
}

natsConn.Subscribe("test", func(m *nats.Msg) {
	natsConn.Publish(m.Reply, m.Data)
	log.Println("[receive and reply]", string(m.Data))
})

Test

Prerequisite

You should install and run each queues

Unit test

$ go test github.com/jaehue/anyq
ok  	github.com/jaehue/anyq	1.136s

Benchmark test

$ go test github.com/jaehue/anyq/benchmark -test.bench=. -test.benchmem
testing: warning: no tests to run
PASS
BenchmarkKafkaAsyncProduce	  300000	      4111 ns/op	     700 B/op	      10 allocs/op
BenchmarkKafkaSyncProduce	   20000	    100699 ns/op	    3080 B/op	      58 allocs/op
BenchmarkKafkaConsume	   30000	    151092 ns/op	   27805 B/op	     405 allocs/op
BenchmarkNatsProduce	  500000	      3468 ns/op	     280 B/op	       4 allocs/op
BenchmarkNatsConsume	  200000	     10199 ns/op	    1429 B/op	      21 allocs/op
BenchmarkNatsReply	    5000	    256335 ns/op	    2128 B/op	      88 allocs/op
BenchmarkNsqProduce	  100000	     14261 ns/op	     852 B/op	      17 allocs/op
BenchmarkNsqConsume	     100	  13415530 ns/op	  824936 B/op	   17322 allocs/op
BenchmarkRabbitmqProduce	   30000	     38698 ns/op	    1739 B/op	      53 allocs/op
BenchmarkRabbitmqConsume	       1	2421170045 ns/op	97861152 B/op	 1966673 allocs/op
ok  	github.com/jaehue/anyq/benchmark	23.944s

Examples

Contributing

  1. Fork it ( https://github.com/jaehue/anyq/fork )
  2. Create your feature branch (git checkout -b my-new-feature)
  3. Commit your changes (git commit -am 'Add some feature')
  4. Push to the branch (git push origin my-new-feature)
  5. Create a new Pull Request

License

MIT (see LICENSE file)