Skip to content

Commit

Permalink
Add options to allow overriding SendBufferSize pre process name prefix
Browse files Browse the repository at this point in the history
  • Loading branch information
Ling Yuan committed Mar 20, 2020
1 parent 90ac437 commit cd0585b
Show file tree
Hide file tree
Showing 3 changed files with 85 additions and 1 deletion.
22 changes: 21 additions & 1 deletion connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"fmt"
"io"
"net"
"strings"
"sync"
"syscall"
"time"
Expand Down Expand Up @@ -124,6 +125,12 @@ func (e errConnectionUnknownState) Error() string {
return fmt.Sprintf("connection is in unknown state: %v at %v", e.state, e.site)
}

// SendBufferSizeOverride is used for overriding per process name prefix send buffer channel size.
type SendBufferSizeOverride struct {
ProcessNamePrefix string
SendBufferSize int
}

// ConnectionOptions are options that control the behavior of a Connection
type ConnectionOptions struct {
// The frame pool, allowing better management of frame buffers. Defaults to using raw heap.
Expand All @@ -135,6 +142,10 @@ type ConnectionOptions struct {
// The size of send channel buffers. Defaults to 512.
SendBufferSize int

// Per-process name prefix override for SendBufferSize
// Note that order matters, if there are multiple matches, the first one is used.
SendBufferSizeOverrides []SendBufferSizeOverride

// The type of checksum to use when sending messages.
ChecksumType ChecksumType

Expand Down Expand Up @@ -264,6 +275,15 @@ func (co ConnectionOptions) withDefaults() ConnectionOptions {
return co
}

func (co ConnectionOptions) getSendBufferSize(processName string) int {
for _, override := range co.SendBufferSizeOverrides {
if strings.HasPrefix(processName, override.ProcessNamePrefix) {
return override.SendBufferSize
}
}
return co.SendBufferSize
}

func (ch *Channel) setConnectionTosPriority(tosPriority tos.ToS, c net.Conn) error {
tcpAddr, isTCP := c.RemoteAddr().(*net.TCPAddr)
if !isTCP {
Expand Down Expand Up @@ -311,7 +331,7 @@ func (ch *Channel) newConnection(conn net.Conn, initialID uint32, outboundHP str
connDirection: connDirection,
opts: opts,
state: connectionActive,
sendCh: make(chan *Frame, opts.SendBufferSize),
sendCh: make(chan *Frame, opts.getSendBufferSize(remotePeer.ProcessName)),
stopCh: make(chan struct{}),
localPeerInfo: peerInfo,
remotePeerInfo: remotePeer,
Expand Down
58 changes: 58 additions & 0 deletions connection_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1260,6 +1260,64 @@ func TestLastActivityTimePings(t *testing.T) {
})
}

func TestSendBufferSize(t *testing.T) {
opts := testutils.NewOpts().SetSendBufferSize(512).SetSendBufferSizeOverrides([]SendBufferSizeOverride{
{"abc", 1024},
{"abcd", 2048},
{"xyz", 3072},
})
tests := []struct {
processName string
expectSendChCapacity int
}{
{
processName: "abc",
expectSendChCapacity: 1024,
},
{
processName: "abcd",
expectSendChCapacity: 1024,
},
{
processName: "bcd",
expectSendChCapacity: 512,
},
{
processName: "dabc",
expectSendChCapacity: 512,
},
{
processName: "dabcd",
expectSendChCapacity: 512,
},
{
processName: "abcde",
expectSendChCapacity: 1024,
},
{
processName: "xyzabc",
expectSendChCapacity: 3072,
},
}
for _, tt := range tests {
testutils.WithTestServer(t, opts.NoRelay(), func(tb testing.TB, ts *testutils.TestServer) {
client := ts.NewClient(opts.SetProcessName(tt.processName))

// Send an 'echo' to establish the connection.
testutils.RegisterEcho(ts.Server(), nil)
require.NoError(t, testutils.CallEcho(client, ts.HostPort(), ts.ServiceName(), nil))

// WithTestSever will test with and without relay.
if opts.DisableRelay {
assert.Equal(t, tt.expectSendChCapacity, getConnection(t, ts.Server(), inbound).SendChCapacity)
} else {
assert.Equal(t, tt.expectSendChCapacity, getConnection(t, ts.Relay(), inbound).SendChCapacity)
}

})
}
}

func TestInvalidTransportHeaders(t *testing.T) {
long100 := strings.Repeat("0123456789", 10)
long300 := strings.Repeat("0123456789", 30)
Expand Down
6 changes: 6 additions & 0 deletions testutils/channel_opts.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,12 @@ func (o *ChannelOpts) SetSendBufferSize(bufSize int) *ChannelOpts {
return o
}

// SetSendBufferSizeOverrides sets the SendBufferOverrides in DefaultConnectionOptions.
func (o *ChannelOpts) SetSendBufferSizeOverrides(overrides []tchannel.SendBufferSizeOverride) *ChannelOpts {
o.DefaultConnectionOptions.SendBufferSizeOverrides = overrides
return o
}

// SetTosPriority set TosPriority in DefaultConnectionOptions.
func (o *ChannelOpts) SetTosPriority(tosPriority tos.ToS) *ChannelOpts {
o.DefaultConnectionOptions.TosPriority = tosPriority
Expand Down

0 comments on commit cd0585b

Please sign in to comment.