Skip to content

Commit

Permalink
handle failure in getting amqp chan; logging
Browse files Browse the repository at this point in the history
  • Loading branch information
jandelgado committed Aug 14, 2019
1 parent e3e0f98 commit 2570f12
Show file tree
Hide file tree
Showing 6 changed files with 32 additions and 31 deletions.
2 changes: 1 addition & 1 deletion pkg/amqp_connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ func NewAmqpConnector(uri string, tlsConfig *tls.Config, logger logrus.StdLogger
// Connect (re-)establishes the connection to RabbitMQ broker.
func (s *AmqpConnector) Connect(ctx context.Context, worker AmqpWorkerFunc) error {

sessions := redial(ctx, s.uri, s.tlsConfig)
sessions := redial(ctx, s.uri, s.tlsConfig, s.logger)
for session := range sessions {
s.logger.Printf("waiting for new session ...")
sub, more := <-session
Expand Down
1 change: 0 additions & 1 deletion pkg/publish.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ type PublishMessage struct {
Exchange string
RoutingKey string
Publishing *amqp.Publishing
Error *error
}

// PublishChannel is a channel for PublishMessage message objects
Expand Down
37 changes: 21 additions & 16 deletions pkg/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,18 @@ package rabtap
import (
"context"
"crypto/tls"
"log"
"time"

"github.com/sirupsen/logrus"
"github.com/streadway/amqp"
)

// taken from streadways amqplib examples

const (
retryDelay = 3 * time.Second
)

// Session composes an amqp.Connection with an amqp.Channel
type Session struct {
*amqp.Connection
Expand All @@ -28,8 +32,12 @@ func (s *Session) NewChannel() error {
return err
}

// redial continually connects to the URL, exiting the program when no longer possible
func redial(ctx context.Context, url string, tlsConfig *tls.Config) chan chan Session {
// redial continually connects to the URL and provides a AMQP connection and
// channel in a Session struct. Closes returned chan when initial connection
// attempt fails.
func redial(ctx context.Context, url string, tlsConfig *tls.Config,
logger logrus.StdLogger) chan chan Session {

sessions := make(chan chan Session)

go func() {
Expand All @@ -41,47 +49,44 @@ func redial(ctx context.Context, url string, tlsConfig *tls.Config) chan chan Se
select {
case sessions <- sess:
case <-ctx.Done():
log.Println("shutting down session factory")
logger.Println("session: shutting down factory (cancel)")
close(sess)
return
}

// try to connect. fail early if initial connection cant be
// established.
var conn *amqp.Connection
var ch *amqp.Channel
var err error
for {
conn, err = amqp.DialTLS(url, tlsConfig)
if err == nil {
break
ch, err = conn.Channel()
if err == nil {
break
}
}
log.Printf("cannot (re)dial: %v: %q", err, url)
logger.Printf("session: cannot (re-)dial: %v: %q", err, url)
if initial {
log.Printf("initial connection failed")
close(sess)
return
}
select {
case <-ctx.Done():
log.Println("shutting down session factory")
logger.Println("session: shutting down factory (cancel)")
close(sess)
return
case <-time.After(2 * time.Second):
case <-time.After(retryDelay):
}
}

initial = false
log.Printf("connected to %s", url)

ch, err := conn.Channel()
if err != nil {
log.Printf("cannot create channel: %v", err)
}

select {
case sess <- Session{conn, ch}:
case <-ctx.Done():
log.Println("shutting down new session")
logger.Println("session: shutting down factory (cancel)")
close(sess)
return
}
Expand Down
14 changes: 10 additions & 4 deletions pkg/session_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ package rabtap
import (
"context"
"crypto/tls"
"log"
"os"
"testing"

"github.com/jandelgado/rabtap/pkg/testcommon"
Expand All @@ -18,7 +20,8 @@ func TestSessionProvidesConnectionAndChannel(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

sessions := redial(ctx, testcommon.IntegrationURIFromEnv(), &tls.Config{})
log := log.New(os.Stdout, "session_inttest: ", log.Lshortfile)
sessions := redial(ctx, testcommon.IntegrationURIFromEnv(), &tls.Config{}, log)

sessionFactory := <-sessions
session := <-sessionFactory
Expand All @@ -31,7 +34,8 @@ func TestSessionShutsDownProperlyWhenCancelled(t *testing.T) {

ctx, cancel := context.WithCancel(context.Background())

sessions := redial(ctx, testcommon.IntegrationURIFromEnv(), &tls.Config{})
log := log.New(os.Stdout, "session_inttest: ", log.Lshortfile)
sessions := redial(ctx, testcommon.IntegrationURIFromEnv(), &tls.Config{}, log)

sessionFactory, more := <-sessions
assert.True(t, more)
Expand All @@ -45,7 +49,8 @@ func TestSessionFailsEarlyWhenNoConnectionIsPossible(t *testing.T) {

ctx := context.Background()

sessions := redial(ctx, "amqp://localhost:1", &tls.Config{})
log := log.New(os.Stdout, "session_inttest: ", log.Lshortfile)
sessions := redial(ctx, "amqp://localhost:1", &tls.Config{}, log)

sessionFactory, more := <-sessions
assert.True(t, more)
Expand All @@ -61,7 +66,8 @@ func TestSessionNewChannelReturnsNewChannel(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

sessions := redial(ctx, testcommon.IntegrationURIFromEnv(), &tls.Config{})
log := log.New(os.Stdout, "session_inttest: ", log.Lshortfile)
sessions := redial(ctx, testcommon.IntegrationURIFromEnv(), &tls.Config{}, log)

sessionFactory := <-sessions
session := <-sessionFactory
Expand Down
1 change: 0 additions & 1 deletion pkg/subscribe.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,6 @@ func (s *AmqpSubscriber) messageLoop(ctx context.Context, tapCh TapChannel,
for {
select {
case message, more := <-fanin.Ch:
s.logger.Printf("AmqpSubscriber: more=%v", more)
if !more {
return doReconnect
}
Expand Down
8 changes: 0 additions & 8 deletions pkg/tap_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,11 +93,9 @@ func TestIntegrationHeadersExchange(t *testing.T) {
// header is constructed.
testcommon.PublishTestMessages(t, ch, MessagesPerTest, "headers-exchange", "", amqp.Table{"header1": "test0"})

log.Println("waiting for messages to appear on tap")
requireIntFromChan(t, finishChan, MessagesPerTest)

// the original messages should also be delivered.
log.Println("receiving original messages...")
testcommon.VerifyTestMessageOnQueue(t, ch, "consumer2", MessagesPerTest, "queue-0", finishChan)
requireIntFromChan(t, finishChan, MessagesPerTest)
}
Expand All @@ -121,11 +119,9 @@ func TestIntegrationDirectExchange(t *testing.T) {
// in the tap-exchange defined above.
testcommon.PublishTestMessages(t, ch, MessagesPerTest, "direct-exchange", "queue-0", nil)

log.Println("waiting for messages to appear on tap")
requireIntFromChan(t, finishChan, MessagesPerTest)

// the original messages should also be delivered.
log.Println("receiving original message...")
testcommon.VerifyTestMessageOnQueue(t, ch, "consumer2", MessagesPerTest, "queue-0", finishChan)
requireIntFromChan(t, finishChan, MessagesPerTest)
}
Expand Down Expand Up @@ -154,11 +150,9 @@ func TestIntegrationTopicExchangeTapSingleQueue(t *testing.T) {
testcommon.PublishTestMessages(t, ch, MessagesPerTest, "topic-exchange", "queue-0", nil)
testcommon.PublishTestMessages(t, ch, MessagesPerTest, "topic-exchange", "queue-1", nil)

log.Println("waiting for messages to appear on tap")
requireIntFromChan(t, finishChan, MessagesPerTest)

// the original messages should also be delivered.
log.Println("receiving original message...")
testcommon.VerifyTestMessageOnQueue(t, ch, "consumer2", MessagesPerTest, "queue-0", finishChan)
requireIntFromChan(t, finishChan, MessagesPerTest)

Expand Down Expand Up @@ -189,11 +183,9 @@ func TestIntegrationTopicExchangeTapWildcard(t *testing.T) {
testcommon.PublishTestMessages(t, ch, MessagesPerTest, "topic-exchange", "queue-0", nil)
testcommon.PublishTestMessages(t, ch, MessagesPerTest, "topic-exchange", "queue-1", nil)

log.Println("waiting for messages to appear on tap")
requireIntFromChan(t, finishChan, MessagesPerTest*2)

// the original messages should also be delivered.
log.Println("receiving original message...")
testcommon.VerifyTestMessageOnQueue(t, ch, "consumer2", MessagesPerTest, "queue-0", finishChan)
requireIntFromChan(t, finishChan, MessagesPerTest)

Expand Down

0 comments on commit 2570f12

Please sign in to comment.