Skip to content

Commit

Permalink
Remove channels for async subscribers, better slow consumer, etc
Browse files Browse the repository at this point in the history
  • Loading branch information
derekcollison committed Jan 18, 2016
1 parent 5ea5077 commit 43c8fec
Show file tree
Hide file tree
Showing 9 changed files with 643 additions and 125 deletions.
20 changes: 10 additions & 10 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,9 @@ A [Go](http://golang.org) client for the [NATS messaging system](https://nats.io
# Go client
go get github.com/nats-io/nats

# Servers
# Server

# gnatsd
go get github.com/nats-io/gnatsd

# nats-server (Ruby)
gem install nats
```

## Basic Usage
Expand All @@ -37,8 +33,12 @@ nc.Subscribe("foo", func(m *Msg) {
sub, err := nc.SubscribeSync("foo")
m, err := sub.NextMsg(timeout)

// Unsubscribing
sub, err := nc.Subscribe("foo", nil)
// Channel Subscriber
ch := make(chan *nats.Msg, 64)
sub, err := nc.ChanSubscribe("foo", ch)
msg <- ch

// Unsubscribe
sub.Unsubscribe()

// Requests
Expand Down Expand Up @@ -82,12 +82,12 @@ c.Subscribe("hello", func(p *person) {
fmt.Printf("Received a person: %+v\n", p)
})

me := &person{Name: "derek", Age: 22, Address: "585 Howard Street, San Francisco, CA"}
me := &person{Name: "derek", Age: 22, Address: "140 New Montgomery Street, San Francisco, CA"}

// Go type Publisher
c.Publish("hello", me)

// Unsubscribing
// Unsubscribe
sub, err := c.Subscribe("foo", nil)
...
sub.Unsubscribe()
Expand Down Expand Up @@ -167,7 +167,7 @@ ec.BindRecvChan("hello", recvCh)
sendCh := make(chan *person)
ec.BindSendChan("hello", sendCh)

me := &person{Name: "derek", Age: 22, Address: "585 Howard Street"}
me := &person{Name: "derek", Age: 22, Address: "140 New Montgomery Street"}

// Send via Go channels
sendCh <- me
Expand Down
2 changes: 1 addition & 1 deletion enc.go
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,7 @@ func (c *EncodedConn) subscribe(subject, queue string, cb Handler) (*Subscriptio
cbValue.Call(oV)
}

return c.Conn.subscribe(subject, queue, natsCB, c.Conn.Opts.SubChanLen)
return c.Conn.subscribe(subject, queue, natsCB, c.Conn.Opts.SubChanLen, nil)
}

// FlushTimeout allows a Flush operation to have an associated timeout.
Expand Down
21 changes: 20 additions & 1 deletion examples/nats-bench.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"fmt"
"log"
"os"
"strconv"
"strings"
"sync"
"time"
Expand Down Expand Up @@ -81,7 +82,7 @@ func main() {
if *numSubs > 0 {
total *= float64(*numSubs)
}
fmt.Printf("\nNATS throughput is %.f msgs/sec\n", total/delta)
fmt.Printf("\nNATS throughput is %s msgs/sec\n", commaFormat(int64(total/delta)))
}

func runPublisher(startwg, donewg *sync.WaitGroup, opts nats.Options, numMsgs int) {
Expand Down Expand Up @@ -128,3 +129,21 @@ func runSubscriber(startwg, donewg *sync.WaitGroup, opts nats.Options, numMsgs i
nc.Flush()
startwg.Done()
}

func commaFormat(n int64) string {
in := strconv.FormatInt(n, 10)
out := make([]byte, len(in)+(len(in)-2+int(in[0]/'0'))/3)
if in[0] == '-' {
in, out[0] = in[1:], '-'
}
for i, j, k := len(in)-1, len(out)-1, 0; ; i, j = i-1, j-1 {
out[j] = in[i]
if i == 0 {
return string(out)
}
if k++; k == 3 {
j, k = j-1, 0
out[j] = ','
}
}
}
Loading

0 comments on commit 43c8fec

Please sign in to comment.