Skip to content

Commit

Permalink
improve termination behaviour
Browse files Browse the repository at this point in the history
  • Loading branch information
jandelgado committed Dec 7, 2018
1 parent d3210c0 commit 0c66427
Show file tree
Hide file tree
Showing 9 changed files with 27 additions and 18 deletions.
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@

### Fixes
* fix: when publishing (`rabtap pub` messages from stdin, a single EOF (ctrl+d)
ends now the piublishing process.
ends now the publishing process
* fix: `rabtap pub` fails with error message when publishing to unknown exchange
* fix: pub, sub, and tap now fail early when there is a connection problem on
the initial connection to the broker
Expand Down
1 change: 0 additions & 1 deletion cmd/main/cmd_publish.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,6 @@ func createMessageReaderFunc(jsonFormat bool, reader io.Reader) MessageReaderFun
func publishMessageStream(publishChannel rabtap.PublishChannel,
exchange, routingKey string,
readNextMessageFunc MessageReaderFunc) error {

for {
msg, more, err := readNextMessageFunc()
switch err {
Expand Down
4 changes: 2 additions & 2 deletions cmd/main/cmd_subscribe.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,6 @@ func cmdSubscribe(cmd CmdSubscribeArg) error {
messageChannel := make(rabtap.TapChannel)
subscriber := rabtap.NewAmqpSubscriber(cmd.amqpURI, cmd.tlsConfig, log)
defer subscriber.Close()
go messageReceiveLoop(messageChannel, cmd.messageReceiveFunc, cmd.signalChannel)
return subscriber.EstablishSubscription(cmd.queue, messageChannel)
go subscriber.EstablishSubscription(cmd.queue, messageChannel)
return messageReceiveLoop(messageChannel, cmd.messageReceiveFunc, cmd.signalChannel)
}
2 changes: 2 additions & 0 deletions cmd/main/command_line.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,8 @@ Examples:
# use RABTAP_AMQPURI environment variable to specify broker instead of --uri
export RABTAP_AMQPURI=amqp://guest:guest@localhost:5672/
echo "Hello" | rabtap pub amq.topic -r "some.key"
rabtap sub JDQ
rabtap queue create JDQ
rabtap queue bind JDQ to amq.direct --bindingkey=key
rabtap queue rm JDQ
Expand Down
2 changes: 0 additions & 2 deletions cmd/main/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,8 +85,6 @@ func startCmdSubscribe(args CommandLineArgs) {
// signalChannel receives ctrl+C/interrput signal
signalChannel := make(chan os.Signal, 5)
signal.Notify(signalChannel, os.Interrupt)
// messageReceiveFunc receives the tapped messages, prints
// and optionally saves them.
messageReceiveFunc := createMessageReceiveFunc(
NewColorableWriter(os.Stdout), args.JSONFormat,
args.SaveDir, args.NoColor)
Expand Down
7 changes: 6 additions & 1 deletion cmd/main/subscribe.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,11 @@ func messageReceiveLoop(messageChan rabtap.TapChannel,

for {
select {
case message := <-messageChan:
case message, more := <-messageChan:
if !more {
log.Debug("subscribe: messageReceiveLoop: channel closed.")
return nil
}
log.Debugf("subscribe: messageReceiveLoop: new message %#+v", message)
if message.Error != nil {
// unrecoverable error received -> log and exit
Expand All @@ -35,6 +39,7 @@ func messageReceiveLoop(messageChan rabtap.TapChannel,
log.Error(err)
}
case <-signalChannel:
log.Debugf("subscribe: caught signal!")
return nil
}
}
Expand Down
6 changes: 4 additions & 2 deletions pkg/amqp_connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ func (s *AmqpConnector) Connect(worker AmqpWorkerFunc) error {
for {
// the error channel is used to detect when (re-)connect is needed will
// be closed by amqp lib when connection is gracefully shut down.
errorChan := make(chan *amqp.Error)
errorChan := make(chan *amqp.Error, 10)
ctx, cancel := context.WithCancel(context.Background())
defer cancel() // to prevent go-routine leaking

Expand All @@ -162,6 +162,7 @@ func (s *AmqpConnector) Connect(worker AmqpWorkerFunc) error {
}
if err != nil {
// connection could not be established
s.workerFinished <- err
return err
}

Expand All @@ -188,5 +189,6 @@ func (s *AmqpConnector) Close() error {
return errors.New("already closed")
}
s.controlChan <- shutdownMessage
return <-s.workerFinished
err := <-s.workerFinished
return err
}
15 changes: 7 additions & 8 deletions pkg/publish.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ type PublishMessage struct {
Exchange string
RoutingKey string
Publishing *amqp.Publishing
Error *error
}

// PublishChannel is a channel for PublishMessage message objects
Expand All @@ -40,18 +41,17 @@ func (s *AmqpPublish) Connected() bool {
return s.connection.Connected()
}

// createWorkerFunc receives messages on the provides channel and publishes
// createWorkerFunc receives messages on the provided channel and publishes
// the messages on an rabbitmq exchange
func (s *AmqpPublish) createWorkerFunc(publishChannel PublishChannel) AmqpWorkerFunc {

return func(rabbitConn *amqp.Connection, controlChan chan ControlMessage) ReconnectAction {

channel, err := rabbitConn.Channel()
if err != nil {
return doReconnect
}
defer channel.Close()
errChan := make(chan *amqp.Error)
errChan := make(chan *amqp.Error, 10)
channel.NotifyClose(errChan)

for {
Expand Down Expand Up @@ -89,14 +89,13 @@ func (s *AmqpPublish) createWorkerFunc(publishChannel PublishChannel) AmqpWorker
}
}

// EstablishConnection sets up the connection to the broker and sets up
// the tap, which is bound to the provided consumer function. Typically
// started as go-routine.
// EstablishConnection sets up the connection to the broker
func (s *AmqpPublish) EstablishConnection(publishChannel PublishChannel) error {
return s.connection.Connect(s.createWorkerFunc(publishChannel))
err := s.connection.Connect(s.createWorkerFunc(publishChannel))
return err
}

// Close closes the connection to the broker and ends tapping.
// Close closes the connection to the broker
func (s *AmqpPublish) Close() error {
return s.connection.Close()
}
6 changes: 5 additions & 1 deletion pkg/subscribe.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,11 @@ func (s *AmqpSubscriber) Connected() bool {
// the tap, which is bound to the provided consumer function. Typically
// this function is run as a go-routine.
func (s *AmqpSubscriber) EstablishSubscription(queueName string, tapCh TapChannel) error {
return s.connection.Connect(s.createWorkerFunc(queueName, tapCh))
err := s.connection.Connect(s.createWorkerFunc(queueName, tapCh))
if err != nil {
tapCh <- &TapMessage{nil, err}
}
return err
}

func (s *AmqpSubscriber) createWorkerFunc(
Expand Down

0 comments on commit 0c66427

Please sign in to comment.