Skip to content

Commit

Permalink
Fix termination issues (#26)
Browse files Browse the repository at this point in the history
* terminate fanin on blocking write

* fix tap termination
cleanup dead code
  • Loading branch information
jandelgado committed Aug 30, 2019
1 parent 0680f02 commit da621cb
Show file tree
Hide file tree
Showing 5 changed files with 20 additions and 39 deletions.
3 changes: 1 addition & 2 deletions cmd/rabtap/cmd_tap.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,7 @@ func cmdTap(ctx context.Context, tapConfig []rabtap.TapConfiguration, tlsConfig
tap := rabtap.NewAmqpTap(config.AmqpURI, tlsConfig, log)
taps = append(taps, tap)
g.Go(func() error {
err := tap.EstablishTap(ctx, config.Exchanges, tapMessageChannel)
return err
return tap.EstablishTap(ctx, config.Exchanges, tapMessageChannel)
})
}
g.Go(func() error {
Expand Down
16 changes: 13 additions & 3 deletions cmd/rabtap/subscribe.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,19 @@ func messageReceiveLoop(ctx context.Context, messageChan rabtap.TapChannel,
return nil
}
log.Debugf("subscribe: messageReceiveLoop: new message %#+v", message)
// let the receiveFunc do the actual message processing
if err := messageReceiveFunc(message); err != nil {
log.Error(err)
tmpCh := make(rabtap.TapChannel)
go func() {
m := <-tmpCh
// let the receiveFunc do the actual message processing
if err := messageReceiveFunc(m); err != nil {
log.Error(err)
}
}()
select {
case tmpCh <- message:
case <-ctx.Done():
log.Debugf("subscribe: cancel (messageReceiveFunc)")
return nil
}
}
}
Expand Down
6 changes: 5 additions & 1 deletion pkg/fanin.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,11 @@ func (s *Fanin) loop() error {
return nil
}
} else {
s.Ch <- message.Interface()
// allow a blocking write to s.Ch to be terminated
select {
case s.Ch <- message.Interface():
case _ = <-s.channels[0].Chan.Interface().(<-chan struct{}):
}
}
}
}
32 changes: 1 addition & 31 deletions pkg/subscribe.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,37 +66,7 @@ func (s *AmqpSubscriber) createWorkerFunc(
// messageLoop expects Fanin object, which expects array of channels.
var channels []interface{}
fanin := NewFanin(append(channels, ch))
return s.messageLoop(ctx, tapCh, fanin), nil
}
}

// messageLoop forwards incoming amqp messages from the fanin to the provided
// tapCh.
// TODO need not be "method"
// TODO pass chan instead of Fanin and using fanin.Ch
func (s *AmqpSubscriber) messageLoop(ctx context.Context, tapCh TapChannel,
fanin *Fanin) ReconnectAction {

for {

select {
case message, more := <-fanin.Ch:
if !more {
return doReconnect
}

amqpMessage, _ := message.(amqp.Delivery)
// Avoid blocking write to tapCh when e.g. on the other end of the
// channel the user pressed Ctrl+S to stop console output
select {
case tapCh <- NewTapMessage(&amqpMessage, time.Now()):
case <-ctx.Done():
return doNotReconnect
}

case <-ctx.Done():
return doNotReconnect
}
return amqpMessageLoop(ctx, tapCh, fanin.Ch), nil
}
}

Expand Down
2 changes: 0 additions & 2 deletions pkg/tap.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,9 +56,7 @@ func (s *AmqpTap) createWorkerFunc(
}
fanin := NewFanin(amqpChs)
defer func() { _ = fanin.Stop() }()

action := amqpMessageLoop(ctx, tapCh, fanin.Ch)

return action, nil
}
}
Expand Down

0 comments on commit da621cb

Please sign in to comment.