Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

consumer: don't run redistribute without connections #132

Merged
merged 4 commits into from
Apr 8, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 8 additions & 2 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,10 @@ type Config struct {
// Duration between polling lookupd for new producers, and fractional jitter to add to
// the lookupd pool loop. this helps evenly distribute requests even if multiple consumers
// restart at the same time
LookupdPollInterval time.Duration `opt:"lookupd_poll_interval" min:"5s" max:"5m" default:"60s"`
//
// NOTE: when not using nsqlookupd, LookupdPollInterval represents the duration of time between
// reconnection attempts
LookupdPollInterval time.Duration `opt:"lookupd_poll_interval" min:"10ms" max:"5m" default:"60s"`
LookupdPollJitter float64 `opt:"lookupd_poll_jitter" min:"0" max:"1" default:"0.3"`

// Maximum duration when REQueueing (for doubling of deferred requeue)
Expand All @@ -128,10 +131,13 @@ type Config struct {
// Maximum number of times this consumer will attempt to process a message before giving up
MaxAttempts uint16 `opt:"max_attempts" min:"0" max:"65535" default:"5"`

// Amount of time in seconds to wait for a message from a producer when in a state where RDY
// Duration to wait for a message from a producer when in a state where RDY
// counts are re-distributed (ie. max_in_flight < num_producers)
LowRdyIdleTimeout time.Duration `opt:"low_rdy_idle_timeout" min:"1s" max:"5m" default:"10s"`

// Duration between redistributing max-in-flight to connections
RDYRedistributeInterval time.Duration `opt:"rdy_redistribute_interval" min:"1ms" max:"5s" default:"5s"`

// Identifiers sent to nsqd representing this client
// UserAgent is in the spirit of HTTP (default: "<client_library_name>/<version>")
ClientID string `opt:"client_id"` // (defaults: short hostname)
Expand Down
35 changes: 23 additions & 12 deletions consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -728,8 +728,8 @@ func (r *Consumer) onConnClose(c *Conn) {
// try to reconnect after a bit
go func(addr string) {
for {
r.log(LogLevelInfo, "(%s) re-connecting in 15 seconds...", addr)
time.Sleep(15 * time.Second)
r.log(LogLevelInfo, "(%s) re-connecting in %.04f seconds...", addr, r.config.LookupdPollInterval)
time.Sleep(r.config.LookupdPollInterval)
if atomic.LoadInt32(&r.stopFlag) == 1 {
break
}
Expand Down Expand Up @@ -817,7 +817,8 @@ func (r *Consumer) resume() {
// pick a random connection to test the waters
conns := r.conns()
if len(conns) == 0 {
// backoff again
r.log(LogLevelWarning, "no connection available to resume")
r.log(LogLevelWarning, "backing off for %.04f seconds", 1)
r.backoff(time.Second)
return
}
Expand All @@ -831,7 +832,8 @@ func (r *Consumer) resume() {
// while in backoff only ever let 1 message at a time through
err := r.updateRDY(choice, 1)
if err != nil {
r.log(LogLevelWarning, "(%s) error updating RDY - %s", choice.String(), err)
r.log(LogLevelWarning, "(%s) error resuming RDY 1 - %s", choice.String(), err)
r.log(LogLevelWarning, "backing off for %.04f seconds", 1)
r.backoff(time.Second)
return
}
Expand All @@ -848,7 +850,11 @@ func (r *Consumer) inBackoffTimeout() bool {
}

func (r *Consumer) maybeUpdateRDY(conn *Conn) {
if r.inBackoff() || r.inBackoffTimeout() {
inBackoff := r.inBackoff()
inBackoffTimeout := r.inBackoffTimeout()
if inBackoff || inBackoffTimeout {
r.log(LogLevelDebug, "(%s) skip sending RDY inBackoff:%v || inBackoffTimeout:%v",
conn, inBackoff, inBackoffTimeout)
return
}

Expand All @@ -868,7 +874,7 @@ func (r *Consumer) maybeUpdateRDY(conn *Conn) {
}

func (r *Consumer) rdyLoop() {
redistributeTicker := time.NewTicker(5 * time.Second)
redistributeTicker := time.NewTicker(r.config.RDYRedistributeInterval)

for {
select {
Expand Down Expand Up @@ -949,24 +955,29 @@ func (r *Consumer) redistributeRDY() {
return
}

numConns := int32(len(r.conns()))
// if an external heuristic set needRDYRedistributed we want to wait
// until we can actually redistribute to proceed
conns := r.conns()
if len(conns) == 0 {
return
}

maxInFlight := r.getMaxInFlight()
if numConns > maxInFlight {
if len(conns) > int(maxInFlight) {
r.log(LogLevelDebug, "redistributing RDY state (%d conns > %d max_in_flight)",
numConns, maxInFlight)
len(conns), maxInFlight)
atomic.StoreInt32(&r.needRDYRedistributed, 1)
}

if r.inBackoff() && numConns > 1 {
r.log(LogLevelDebug, "redistributing RDY state (in backoff and %d conns > 1)", numConns)
if r.inBackoff() && len(conns) > 1 {
r.log(LogLevelDebug, "redistributing RDY state (in backoff and %d conns > 1)", len(conns))
atomic.StoreInt32(&r.needRDYRedistributed, 1)
}

if !atomic.CompareAndSwapInt32(&r.needRDYRedistributed, 1, 0) {
return
}

conns := r.conns()
possibleConns := make([]*Conn, 0, len(conns))
for _, c := range conns {
lastMsgDuration := time.Now().Sub(c.LastMessageTime())
Expand Down
163 changes: 146 additions & 17 deletions mock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,30 @@ import (
"errors"
"fmt"
"io"
"io/ioutil"
"log"
"net"
"strconv"
"testing"
"time"
)

type tbLog interface {
Log(...interface{})
}

type testLogger struct {
tbLog
}

func (tl *testLogger) Output(maxdepth int, s string) error {
tl.Log(s)
return nil
}

func newTestLogger(tbl tbLog) logger {
return &testLogger{tbl}
}

type instruction struct {
delay time.Duration
frameType int32
Expand All @@ -29,14 +45,13 @@ type mockNSQD struct {
exitChan chan int
}

func newMockNSQD(script []instruction) *mockNSQD {
func newMockNSQD(script []instruction, addr string) *mockNSQD {
n := &mockNSQD{
script: script,
exitChan: make(chan int),
}

addr, _ := net.ResolveTCPAddr("tcp", "127.0.0.1:0")
tcpListener, err := net.Listen("tcp", addr.String())
tcpListener, err := net.Listen("tcp", addr)
if err != nil {
log.Fatalf("FATAL: listen (%s) failed - %s", n.tcpAddr.String(), err)
}
Expand Down Expand Up @@ -147,6 +162,7 @@ func (n *mockNSQD) handle(conn net.Conn) {

exit:
n.tcpListener.Close()
conn.Close()
}

func framedResponse(frameType int32, data []byte) []byte {
Expand Down Expand Up @@ -174,18 +190,17 @@ func framedResponse(frameType int32, data []byte) []byte {
type testHandler struct{}

func (h *testHandler) HandleMessage(message *Message) error {
if bytes.Equal(message.Body, []byte("requeue")) {
switch string(message.Body) {
case "requeue":
message.Requeue(-1)
return nil
}
if bytes.Equal(message.Body, []byte("requeue_no_backoff_1")) {
case "requeue_no_backoff_1":
if message.Attempts > 1 {
return nil
}
message.RequeueWithoutBackoff(-1)
return nil
}
if bytes.Equal(message.Body, []byte("bad")) {
case "bad":
return errors.New("bad")
}
return nil
Expand All @@ -198,8 +213,6 @@ func frameMessage(m *Message) []byte {
}

func TestConsumerBackoff(t *testing.T) {
logger := log.New(ioutil.Discard, "", log.LstdFlags)

msgIDGood := MessageID{'1', '2', '3', '4', '5', '6', '7', '8', '9', '0', 'a', 's', 'd', 'f', 'g', 'h'}
msgGood := NewMessage(msgIDGood, []byte("good"))

Expand All @@ -221,14 +234,16 @@ func TestConsumerBackoff(t *testing.T) {
// needed to exit test
instruction{200 * time.Millisecond, -1, []byte("exit")},
}
n := newMockNSQD(script)

addr, _ := net.ResolveTCPAddr("tcp", "127.0.0.1:0")
n := newMockNSQD(script, addr.String())

topicName := "test_consumer_commands" + strconv.Itoa(int(time.Now().Unix()))
config := NewConfig()
config.MaxInFlight = 5
config.BackoffMultiplier = 10 * time.Millisecond
q, _ := NewConsumer(topicName, "ch", config)
q.SetLogger(logger, LogLevelDebug)
q.SetLogger(newTestLogger(t), LogLevelDebug)
q.AddHandler(&testHandler{})
err := q.ConnectToNSQD(n.tcpAddr.String())
if err != nil {
Expand Down Expand Up @@ -272,8 +287,6 @@ func TestConsumerBackoff(t *testing.T) {
}

func TestConsumerRequeueNoBackoff(t *testing.T) {
// logger := log.New(ioutil.Discard, "", log.LstdFlags)

msgIDGood := MessageID{'1', '2', '3', '4', '5', '6', '7', '8', '9', '0', 'a', 's', 'd', 'f', 'g', 'h'}
msgIDRequeue := MessageID{'r', 'e', 'q', 'v', 'b', '6', '7', '8', '9', '0', 'a', 's', 'd', 'f', 'g', 'h'}
msgIDRequeueNoBackoff := MessageID{'r', 'e', 'q', 'n', 'b', 'a', 'c', 'k', '9', '0', 'a', 's', 'd', 'f', 'g', 'h'}
Expand All @@ -293,14 +306,16 @@ func TestConsumerRequeueNoBackoff(t *testing.T) {
// needed to exit test
instruction{100 * time.Millisecond, -1, []byte("exit")},
}
n := newMockNSQD(script)

addr, _ := net.ResolveTCPAddr("tcp", "127.0.0.1:0")
n := newMockNSQD(script, addr.String())

topicName := "test_requeue" + strconv.Itoa(int(time.Now().Unix()))
config := NewConfig()
config.MaxInFlight = 1
config.BackoffMultiplier = 10 * time.Millisecond
q, _ := NewConsumer(topicName, "ch", config)
// q.SetLogger(logger, LogLevelDebug)
q.SetLogger(newTestLogger(t), LogLevelDebug)
q.AddHandler(&testHandler{})
err := q.ConnectToNSQD(n.tcpAddr.String())
if err != nil {
Expand Down Expand Up @@ -341,3 +356,117 @@ func TestConsumerRequeueNoBackoff(t *testing.T) {
}
}
}

func TestConsumerBackoffDisconnect(t *testing.T) {
msgIDGood := MessageID{'1', '2', '3', '4', '5', '6', '7', '8', '9', '0', 'a', 's', 'd', 'f', 'g', 'h'}
msgIDRequeue := MessageID{'r', 'e', 'q', 'v', 'b', '6', '7', '8', '9', '0', 'a', 's', 'd', 'f', 'g', 'h'}

msgGood := NewMessage(msgIDGood, []byte("good"))
msgRequeue := NewMessage(msgIDRequeue, []byte("requeue"))

script := []instruction{
// SUB
instruction{0, FrameTypeResponse, []byte("OK")},
// IDENTIFY
instruction{0, FrameTypeResponse, []byte("OK")},
instruction{20 * time.Millisecond, FrameTypeMessage, frameMessage(msgGood)},
instruction{20 * time.Millisecond, FrameTypeMessage, frameMessage(msgRequeue)},
instruction{20 * time.Millisecond, FrameTypeMessage, frameMessage(msgRequeue)},
instruction{20 * time.Millisecond, FrameTypeMessage, frameMessage(msgGood)},
// needed to exit test
instruction{100 * time.Millisecond, -1, []byte("exit")},
}

addr, _ := net.ResolveTCPAddr("tcp", "127.0.0.1:0")
n := newMockNSQD(script, addr.String())

topicName := "test_requeue" + strconv.Itoa(int(time.Now().Unix()))
config := NewConfig()
config.MaxInFlight = 5
config.BackoffMultiplier = 10 * time.Millisecond
config.LookupdPollInterval = 10 * time.Millisecond
config.RDYRedistributeInterval = 10 * time.Millisecond
q, _ := NewConsumer(topicName, "ch", config)
q.SetLogger(newTestLogger(t), LogLevelDebug)
q.AddHandler(&testHandler{})
err := q.ConnectToNSQD(n.tcpAddr.String())
if err != nil {
t.Fatalf(err.Error())
}

select {
case <-n.exitChan:
log.Printf("clean exit")
case <-time.After(500 * time.Millisecond):
log.Printf("timeout")
}

for i, r := range n.got {
log.Printf("%d: %s", i, r)
}

expected := []string{
"IDENTIFY",
"SUB " + topicName + " ch",
"RDY 5",
fmt.Sprintf("FIN %s", msgIDGood),
"RDY 0",
fmt.Sprintf("REQ %s 0", msgIDRequeue),
"RDY 1",
"RDY 0",
fmt.Sprintf("REQ %s 0", msgIDRequeue),
"RDY 1",
"RDY 0",
fmt.Sprintf("FIN %s", msgIDGood),
"RDY 1",
}
if len(n.got) != len(expected) {
t.Fatalf("we got %d commands != %d expected", len(n.got), len(expected))
}
for i, r := range n.got {
if string(r) != expected[i] {
t.Fatalf("cmd %d bad %s != %s", i, r, expected[i])
}
}

script = []instruction{
// SUB
instruction{0, FrameTypeResponse, []byte("OK")},
// IDENTIFY
instruction{0, FrameTypeResponse, []byte("OK")},
instruction{20 * time.Millisecond, FrameTypeMessage, frameMessage(msgGood)},
instruction{20 * time.Millisecond, FrameTypeMessage, frameMessage(msgGood)},
// needed to exit test
instruction{100 * time.Millisecond, -1, []byte("exit")},
}

n = newMockNSQD(script, n.tcpAddr.String())

select {
case <-n.exitChan:
log.Printf("clean exit")
case <-time.After(500 * time.Millisecond):
log.Printf("timeout")
}

for i, r := range n.got {
log.Printf("%d: %s", i, r)
}

expected = []string{
"IDENTIFY",
"SUB " + topicName + " ch",
"RDY 1",
"RDY 5",
fmt.Sprintf("FIN %s", msgIDGood),
fmt.Sprintf("FIN %s", msgIDGood),
}
if len(n.got) != len(expected) {
t.Fatalf("we got %d commands != %d expected", len(n.got), len(expected))
}
for i, r := range n.got {
if string(r) != expected[i] {
t.Fatalf("cmd %d bad %s != %s", i, r, expected[i])
}
}
}
2 changes: 1 addition & 1 deletion producer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ func TestProducerConnection(t *testing.T) {

err := w.Publish("write_test", []byte("test"))
if err != nil {
t.Fatalf("should lazily connect")
t.Fatalf("should lazily connect - %s", err)
}

conn := w.conn.(*Conn)
Expand Down