Skip to content

Commit

Permalink
test: add test for single conn disconnect during backoff
Browse files Browse the repository at this point in the history
  • Loading branch information
mreiferson committed Apr 8, 2015
1 parent 667c739 commit 46e252c
Showing 1 changed file with 146 additions and 17 deletions.
163 changes: 146 additions & 17 deletions mock_test.go
Expand Up @@ -7,14 +7,30 @@ import (
"errors"
"fmt"
"io"
"io/ioutil"
"log"
"net"
"strconv"
"testing"
"time"
)

type tbLog interface {
Log(...interface{})
}

type testLogger struct {
tbLog
}

func (tl *testLogger) Output(maxdepth int, s string) error {
tl.Log(s)
return nil
}

func newTestLogger(tbl tbLog) logger {
return &testLogger{tbl}
}

type instruction struct {
delay time.Duration
frameType int32
Expand All @@ -29,14 +45,13 @@ type mockNSQD struct {
exitChan chan int
}

func newMockNSQD(script []instruction) *mockNSQD {
func newMockNSQD(script []instruction, addr string) *mockNSQD {
n := &mockNSQD{
script: script,
exitChan: make(chan int),
}

addr, _ := net.ResolveTCPAddr("tcp", "127.0.0.1:0")
tcpListener, err := net.Listen("tcp", addr.String())
tcpListener, err := net.Listen("tcp", addr)
if err != nil {
log.Fatalf("FATAL: listen (%s) failed - %s", n.tcpAddr.String(), err)
}
Expand Down Expand Up @@ -147,6 +162,7 @@ func (n *mockNSQD) handle(conn net.Conn) {

exit:
n.tcpListener.Close()
conn.Close()
}

func framedResponse(frameType int32, data []byte) []byte {
Expand Down Expand Up @@ -174,18 +190,17 @@ func framedResponse(frameType int32, data []byte) []byte {
type testHandler struct{}

func (h *testHandler) HandleMessage(message *Message) error {
if bytes.Equal(message.Body, []byte("requeue")) {
switch string(message.Body) {
case "requeue":
message.Requeue(-1)
return nil
}
if bytes.Equal(message.Body, []byte("requeue_no_backoff_1")) {
case "requeue_no_backoff_1":
if message.Attempts > 1 {
return nil
}
message.RequeueWithoutBackoff(-1)
return nil
}
if bytes.Equal(message.Body, []byte("bad")) {
case "bad":
return errors.New("bad")
}
return nil
Expand All @@ -198,8 +213,6 @@ func frameMessage(m *Message) []byte {
}

func TestConsumerBackoff(t *testing.T) {
logger := log.New(ioutil.Discard, "", log.LstdFlags)

msgIDGood := MessageID{'1', '2', '3', '4', '5', '6', '7', '8', '9', '0', 'a', 's', 'd', 'f', 'g', 'h'}
msgGood := NewMessage(msgIDGood, []byte("good"))

Expand All @@ -221,14 +234,16 @@ func TestConsumerBackoff(t *testing.T) {
// needed to exit test
instruction{200 * time.Millisecond, -1, []byte("exit")},
}
n := newMockNSQD(script)

addr, _ := net.ResolveTCPAddr("tcp", "127.0.0.1:0")
n := newMockNSQD(script, addr.String())

topicName := "test_consumer_commands" + strconv.Itoa(int(time.Now().Unix()))
config := NewConfig()
config.MaxInFlight = 5
config.BackoffMultiplier = 10 * time.Millisecond
q, _ := NewConsumer(topicName, "ch", config)
q.SetLogger(logger, LogLevelDebug)
q.SetLogger(newTestLogger(t), LogLevelDebug)
q.AddHandler(&testHandler{})
err := q.ConnectToNSQD(n.tcpAddr.String())
if err != nil {
Expand Down Expand Up @@ -272,8 +287,6 @@ func TestConsumerBackoff(t *testing.T) {
}

func TestConsumerRequeueNoBackoff(t *testing.T) {
// logger := log.New(ioutil.Discard, "", log.LstdFlags)

msgIDGood := MessageID{'1', '2', '3', '4', '5', '6', '7', '8', '9', '0', 'a', 's', 'd', 'f', 'g', 'h'}
msgIDRequeue := MessageID{'r', 'e', 'q', 'v', 'b', '6', '7', '8', '9', '0', 'a', 's', 'd', 'f', 'g', 'h'}
msgIDRequeueNoBackoff := MessageID{'r', 'e', 'q', 'n', 'b', 'a', 'c', 'k', '9', '0', 'a', 's', 'd', 'f', 'g', 'h'}
Expand All @@ -293,14 +306,16 @@ func TestConsumerRequeueNoBackoff(t *testing.T) {
// needed to exit test
instruction{100 * time.Millisecond, -1, []byte("exit")},
}
n := newMockNSQD(script)

addr, _ := net.ResolveTCPAddr("tcp", "127.0.0.1:0")
n := newMockNSQD(script, addr.String())

topicName := "test_requeue" + strconv.Itoa(int(time.Now().Unix()))
config := NewConfig()
config.MaxInFlight = 1
config.BackoffMultiplier = 10 * time.Millisecond
q, _ := NewConsumer(topicName, "ch", config)
// q.SetLogger(logger, LogLevelDebug)
q.SetLogger(newTestLogger(t), LogLevelDebug)
q.AddHandler(&testHandler{})
err := q.ConnectToNSQD(n.tcpAddr.String())
if err != nil {
Expand Down Expand Up @@ -341,3 +356,117 @@ func TestConsumerRequeueNoBackoff(t *testing.T) {
}
}
}

func TestConsumerBackoffDisconnect(t *testing.T) {
msgIDGood := MessageID{'1', '2', '3', '4', '5', '6', '7', '8', '9', '0', 'a', 's', 'd', 'f', 'g', 'h'}
msgIDRequeue := MessageID{'r', 'e', 'q', 'v', 'b', '6', '7', '8', '9', '0', 'a', 's', 'd', 'f', 'g', 'h'}

msgGood := NewMessage(msgIDGood, []byte("good"))
msgRequeue := NewMessage(msgIDRequeue, []byte("requeue"))

script := []instruction{
// SUB
instruction{0, FrameTypeResponse, []byte("OK")},
// IDENTIFY
instruction{0, FrameTypeResponse, []byte("OK")},
instruction{20 * time.Millisecond, FrameTypeMessage, frameMessage(msgGood)},
instruction{20 * time.Millisecond, FrameTypeMessage, frameMessage(msgRequeue)},
instruction{20 * time.Millisecond, FrameTypeMessage, frameMessage(msgRequeue)},
instruction{20 * time.Millisecond, FrameTypeMessage, frameMessage(msgGood)},
// needed to exit test
instruction{100 * time.Millisecond, -1, []byte("exit")},
}

addr, _ := net.ResolveTCPAddr("tcp", "127.0.0.1:0")
n := newMockNSQD(script, addr.String())

topicName := "test_requeue" + strconv.Itoa(int(time.Now().Unix()))
config := NewConfig()
config.MaxInFlight = 5
config.BackoffMultiplier = 10 * time.Millisecond
config.LookupdPollInterval = 10 * time.Millisecond
config.RDYRedistributeInterval = 10 * time.Millisecond
q, _ := NewConsumer(topicName, "ch", config)
q.SetLogger(newTestLogger(t), LogLevelDebug)
q.AddHandler(&testHandler{})
err := q.ConnectToNSQD(n.tcpAddr.String())
if err != nil {
t.Fatalf(err.Error())
}

select {
case <-n.exitChan:
log.Printf("clean exit")
case <-time.After(500 * time.Millisecond):
log.Printf("timeout")
}

for i, r := range n.got {
log.Printf("%d: %s", i, r)
}

expected := []string{
"IDENTIFY",
"SUB " + topicName + " ch",
"RDY 5",
fmt.Sprintf("FIN %s", msgIDGood),
"RDY 0",
fmt.Sprintf("REQ %s 0", msgIDRequeue),
"RDY 1",
"RDY 0",
fmt.Sprintf("REQ %s 0", msgIDRequeue),
"RDY 1",
"RDY 0",
fmt.Sprintf("FIN %s", msgIDGood),
"RDY 1",
}
if len(n.got) != len(expected) {
t.Fatalf("we got %d commands != %d expected", len(n.got), len(expected))
}
for i, r := range n.got {
if string(r) != expected[i] {
t.Fatalf("cmd %d bad %s != %s", i, r, expected[i])
}
}

script = []instruction{
// SUB
instruction{0, FrameTypeResponse, []byte("OK")},
// IDENTIFY
instruction{0, FrameTypeResponse, []byte("OK")},
instruction{20 * time.Millisecond, FrameTypeMessage, frameMessage(msgGood)},
instruction{20 * time.Millisecond, FrameTypeMessage, frameMessage(msgGood)},
// needed to exit test
instruction{100 * time.Millisecond, -1, []byte("exit")},
}

n = newMockNSQD(script, n.tcpAddr.String())

select {
case <-n.exitChan:
log.Printf("clean exit")
case <-time.After(500 * time.Millisecond):
log.Printf("timeout")
}

for i, r := range n.got {
log.Printf("%d: %s", i, r)
}

expected = []string{
"IDENTIFY",
"SUB " + topicName + " ch",
"RDY 1",
"RDY 5",
fmt.Sprintf("FIN %s", msgIDGood),
fmt.Sprintf("FIN %s", msgIDGood),
}
if len(n.got) != len(expected) {
t.Fatalf("we got %d commands != %d expected", len(n.got), len(expected))
}
for i, r := range n.got {
if string(r) != expected[i] {
t.Fatalf("cmd %d bad %s != %s", i, r, expected[i])
}
}
}

0 comments on commit 46e252c

Please sign in to comment.