Skip to content

Commit

Permalink
Update tchannel inbound && transport to have atomic running var
Browse files Browse the repository at this point in the history
  • Loading branch information
Will Hughes committed Dec 15, 2016
1 parent e598367 commit d974fa2
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 6 deletions.
9 changes: 6 additions & 3 deletions transport/tchannel/channel_inbound.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import (
"go.uber.org/yarpc/internal/errors"

"github.com/opentracing/opentracing-go"
"github.com/uber/tchannel-go"
"github.com/uber-go/atomic"
)

// ChannelInbound is a TChannel Inbound backed by a pre-existing TChannel
Expand All @@ -36,6 +36,8 @@ type ChannelInbound struct {
registry transport.Registry
tracer opentracing.Tracer
transport *ChannelTransport

running atomic.Bool
}

// NewInbound returns a new TChannel inbound backed by a shared TChannel
Expand Down Expand Up @@ -76,6 +78,7 @@ func (i *ChannelInbound) Start() error {
if i.registry == nil {
return errors.NoRegistryError{}
}
i.running.Store(true)

// Set up handlers. This must occur after construction because the
// dispatcher, or its equivalent, calls SetRegistry before Start.
Expand All @@ -89,11 +92,11 @@ func (i *ChannelInbound) Start() error {

// Stop stops the TChannel outbound. This currently does nothing.
func (i *ChannelInbound) Stop() error {
i.running.Store(false)
return nil
}

// IsRunning returns whether the ChannelInbound is running.
func (i *ChannelInbound) IsRunning() bool {
// ChannelListening means the tchannel is accepting new requests at this time.
return i.ch.State() == tchannel.ChannelListening
return i.running.Load()
}
9 changes: 6 additions & 3 deletions transport/tchannel/channel_transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"fmt"

"github.com/opentracing/opentracing-go"
"github.com/uber-go/atomic"
"github.com/uber/tchannel-go"
)

Expand Down Expand Up @@ -75,6 +76,8 @@ type ChannelTransport struct {
err error
addr string
tracer opentracing.Tracer

running atomic.Bool
}

// Channel returns the underlying TChannel "Channel" instance.
Expand All @@ -92,6 +95,7 @@ func (t *ChannelTransport) ListenAddr() string {
//
// All inbounds must have been assigned a registry to accept inbound requests.
func (t *ChannelTransport) Start() error {
t.running.Store(true)
// Return error deferred from constructor for the construction of a TChannel.
if t.err != nil {
return t.err
Expand Down Expand Up @@ -132,13 +136,12 @@ func (t *ChannelTransport) Start() error {
//
// Stop blocks until the program can gracefully exit.
func (t *ChannelTransport) Stop() error {
t.running.Store(false)
t.ch.Close()
return nil
}

// IsRunning returns whether the ChannelTransport is running.
func (t *ChannelTransport) IsRunning() bool {
// ChannelClosed means the tchannel client has been stopped/closed.
// ChannelClient means the tchannel client hasn't been started yet.
return t.ch.State() != tchannel.ChannelClosed && t.ch.State() != tchannel.ChannelClient
return t.running.Load()
}

0 comments on commit d974fa2

Please sign in to comment.