Skip to content

Commit

Permalink
fix tap termination
Browse files Browse the repository at this point in the history
cleanup dead code
  • Loading branch information
jandelgado committed Aug 30, 2019
1 parent 46e16fb commit d288c3a
Show file tree
Hide file tree
Showing 4 changed files with 15 additions and 38 deletions.
3 changes: 1 addition & 2 deletions cmd/rabtap/cmd_tap.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,7 @@ func cmdTap(ctx context.Context, tapConfig []rabtap.TapConfiguration, tlsConfig
tap := rabtap.NewAmqpTap(config.AmqpURI, tlsConfig, log)
taps = append(taps, tap)
g.Go(func() error {
err := tap.EstablishTap(ctx, config.Exchanges, tapMessageChannel)
return err
return tap.EstablishTap(ctx, config.Exchanges, tapMessageChannel)
})
}
g.Go(func() error {
Expand Down
16 changes: 13 additions & 3 deletions cmd/rabtap/subscribe.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,19 @@ func messageReceiveLoop(ctx context.Context, messageChan rabtap.TapChannel,
return nil
}
log.Debugf("subscribe: messageReceiveLoop: new message %#+v", message)
// let the receiveFunc do the actual message processing
if err := messageReceiveFunc(message); err != nil {
log.Error(err)
tmpCh := make(rabtap.TapChannel)
go func() {
m := <-tmpCh
// let the receiveFunc do the actual message processing
if err := messageReceiveFunc(m); err != nil {
log.Error(err)
}
}()
select {
case tmpCh <- message:
case <-ctx.Done():
log.Debugf("subscribe: cancel (messageReceiveFunc)")
return nil
}
}
}
Expand Down
32 changes: 1 addition & 31 deletions pkg/subscribe.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,37 +66,7 @@ func (s *AmqpSubscriber) createWorkerFunc(
// messageLoop expects Fanin object, which expects array of channels.
var channels []interface{}
fanin := NewFanin(append(channels, ch))
return s.messageLoop(ctx, tapCh, fanin), nil
}
}

// messageLoop forwards incoming amqp messages from the fanin to the provided
// tapCh.
// TODO need not be "method"
// TODO pass chan instead of Fanin and using fanin.Ch
func (s *AmqpSubscriber) messageLoop(ctx context.Context, tapCh TapChannel,
fanin *Fanin) ReconnectAction {

for {

select {
case message, more := <-fanin.Ch:
if !more {
return doReconnect
}

amqpMessage, _ := message.(amqp.Delivery)
// Avoid blocking write to tapCh when e.g. on the other end of the
// channel the user pressed Ctrl+S to stop console output
select {
case tapCh <- NewTapMessage(&amqpMessage, time.Now()):
case <-ctx.Done():
return doNotReconnect
}

case <-ctx.Done():
return doNotReconnect
}
return amqpMessageLoop(ctx, tapCh, fanin.Ch), nil
}
}

Expand Down
2 changes: 0 additions & 2 deletions pkg/tap.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,9 +56,7 @@ func (s *AmqpTap) createWorkerFunc(
}
fanin := NewFanin(amqpChs)
defer func() { _ = fanin.Stop() }()

action := amqpMessageLoop(ctx, tapCh, fanin.Ch)

return action, nil
}
}
Expand Down

0 comments on commit d288c3a

Please sign in to comment.