Skip to content

Commit

Permalink
Track all channels created in a process
Browse files Browse the repository at this point in the history
Expose these channels through introspection for easier debugging
  • Loading branch information
prashantv committed Feb 2, 2016
1 parent 979ccd8 commit e08de63
Show file tree
Hide file tree
Showing 4 changed files with 198 additions and 0 deletions.
90 changes: 90 additions & 0 deletions all_channels.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
// Copyright (c) 2015 Uber Technologies, Inc.

// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

package tchannel

import (
"bytes"
"runtime"
"strconv"
"sync"
)

// channelMap is used to ensure that applications don't create multiple channels with
// the same service name in a single process.
var channelMap = struct {
sync.Mutex
existing map[string][]*Channel
}{
existing: make(map[string][]*Channel),
}

func getCallerStack(skip int) string {
callers := make([]uintptr, 32)
n := runtime.Callers(skip+2 /* skip Callers and self */, callers)
callers = callers[:n]

buf := &bytes.Buffer{}
for _, pc := range callers {
f := runtime.FuncForPC(pc)
name := f.Name()
file, line := f.FileLine(pc)

buf.WriteString(name)
buf.WriteByte('\n')
buf.WriteString(" at ")
buf.WriteString(file)
buf.WriteByte(':')
buf.WriteString(strconv.Itoa(line))
buf.WriteByte('\n')
}

return buf.String()
}

func registerNewChannel(ch *Channel) {
serviceName := ch.ServiceName()
ch.createdStack = getCallerStack(1 /* skip self */)

channelMap.Lock()
defer channelMap.Unlock()

existing := channelMap.existing[serviceName]
channelMap.existing[serviceName] = append(existing, ch)
}

func removeClosedChannel(ch *Channel) {
channelMap.Lock()
defer channelMap.Unlock()

channels := channelMap.existing[ch.ServiceName()]
for i, v := range channels {
if v != ch {
continue
}

// Replace current index with the last element, and truncate channels.
channels[i] = channels[len(channels)-1]
channels = channels[:len(channels)-1]
break
}

channelMap.existing[ch.ServiceName()] = channels
}
62 changes: 62 additions & 0 deletions all_channels_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
// Copyright (c) 2015 Uber Technologies, Inc.

// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

package tchannel

import (
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func TestAllChannelsRegistered(t *testing.T) {
ch1_1, err := NewChannel("ch1", nil)
require.NoError(t, err, "Channel create failed")
ch1_2, err := NewChannel("ch1", nil)
require.NoError(t, err, "Channel create failed")
ch2_1, err := NewChannel("ch2", nil)
require.NoError(t, err, "Channel create failed")

state := ch1_1.IntrospectState(nil)
assert.Equal(t, 1, len(state.OtherChannels["ch1"]))
assert.Equal(t, 1, len(state.OtherChannels["ch2"]))

ch1_2.Close()

state = ch1_1.IntrospectState(nil)
assert.Equal(t, 0, len(state.OtherChannels["ch1"]))
assert.Equal(t, 1, len(state.OtherChannels["ch2"]))

ch2_2, err := NewChannel("ch2", nil)

state = ch1_1.IntrospectState(nil)
require.NoError(t, err, "Channel create failed")
assert.Equal(t, 0, len(state.OtherChannels["ch1"]))
assert.Equal(t, 2, len(state.OtherChannels["ch2"]))

ch1_1.Close()
ch2_1.Close()
ch2_2.Close()

state = ch1_1.IntrospectState(nil)
assert.Equal(t, 0, len(state.OtherChannels["ch1"]))
assert.Equal(t, 0, len(state.OtherChannels["ch2"]))
}
4 changes: 4 additions & 0 deletions channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ const (
type Channel struct {
channelConnectionCommon

createdStack string
commonStatsTags map[string]string
connectionOptions ConnectionOptions
handlers *handlerMap
Expand Down Expand Up @@ -215,6 +216,8 @@ func NewChannel(serviceName string, opts *ChannelOptions) (*Channel, error) {
ch.traceReporter = traceReporter

ch.registerInternal()

registerNewChannel(ch)
return ch, nil
}

Expand Down Expand Up @@ -622,4 +625,5 @@ func (ch *Channel) Close() {
for _, c := range connections {
c.Close()
}
removeClosedChannel(ch)
}
42 changes: 42 additions & 0 deletions introspection.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,9 @@ type IntrospectionOptions struct {

// RuntimeState is a snapshot of the runtime state for a channel.
type RuntimeState struct {
// CreatedStack is the stack for how this channel was created.
CreatedStack string `json:"createdStack"`

// LocalPeer is the local peer information (service name, host-port, etc).
LocalPeer LocalPeerInfo `json:"localPeer"`

Expand All @@ -52,6 +55,9 @@ type RuntimeState struct {

// NumConnections is the number of connections stored in the channel.
NumConnections int `json:"numConnections"`

// OtherChannels is information about any other channels running in this process.
OtherChannels map[string][]OtherChannelState `json:"otherChannels,omitEmpty"`
}

// GoRuntimeStateOptions are the options used when getting Go runtime state.
Expand All @@ -60,6 +66,12 @@ type GoRuntimeStateOptions struct {
IncludeGoStacks bool `json:"includeGoStacks"`
}

// OtherChannelState is the state of other channels in the same process.
type OtherChannelState struct {
CreatedStack string `json:"createdStack"`
LocalPeer LocalPeerInfo `json:"localPeer"`
}

// GoRuntimeState is a snapshot of runtime stats from the runtime.
type GoRuntimeState struct {
MemStats runtime.MemStats `json:"memStats"`
Expand Down Expand Up @@ -128,11 +140,41 @@ func (ch *Channel) IntrospectState(opts *IntrospectionOptions) *RuntimeState {
ch.mutable.RUnlock()

return &RuntimeState{
CreatedStack: ch.createdStack,
LocalPeer: ch.PeerInfo(),
SubChannels: ch.subChannels.IntrospectState(opts),
RootPeers: ch.rootPeers().IntrospectState(opts),
Peers: ch.Peers().IntrospectList(opts),
NumConnections: conns,
OtherChannels: ch.IntrospectOthers(opts),
}
}

// IntrospectOthers returns the OtherChannelState for all other channels in this process.
func (ch *Channel) IntrospectOthers(opts *IntrospectionOptions) map[string][]OtherChannelState {
channelMap.Lock()
defer channelMap.Unlock()

states := make(map[string][]OtherChannelState)
for svc, channels := range channelMap.existing {
otherChannelStates := make([]OtherChannelState, 0, len(channels))
for _, otherChan := range channels {
if ch == otherChan {
continue
}
otherChannelStates = append(otherChannelStates, otherChan.IntrospectOtherState(opts))
}
states[svc] = otherChannelStates
}

return states
}

// IntrospectOtherState returns OtherChannelState for a channel.
func (ch *Channel) IntrospectOtherState(opts *IntrospectionOptions) OtherChannelState {
return OtherChannelState{
CreatedStack: ch.createdStack,
LocalPeer: ch.PeerInfo(),
}
}

Expand Down

0 comments on commit e08de63

Please sign in to comment.