Skip to content

Commit

Permalink
Set DiffServ (QoS) bit on Outbound Connections
Browse files Browse the repository at this point in the history
The QoS option is configurable using DiffServ names AF11, etc.
DiffServ names correspond with rfc4594 recommended values,
that are also implemented with iptables.
A ToS bit is Configurable using DefaultConnectionOptions.TosPriority
configuration parameter on create server.

The QoS feature was implmeented with a dual stack network in mind.
setsockopt will throw an error if the wrong address family flag
is called.

- Adding Configuration Parameter TosPriority to ConnectonOptions.
- dscpName to interger value in tos.go.
- Test dscp value mapping is compliant with RFC4594 recommened values.
- TestTosPriority in connection_test.go to verify the outgoing socket
has the correct ToS Bit set.
- Helper function IsTosPriority to determine if bit matches
  • Loading branch information
Matt Rivet authored and schallert committed Mar 17, 2017
1 parent 9246b94 commit 18b22ed
Show file tree
Hide file tree
Showing 7 changed files with 231 additions and 4 deletions.
36 changes: 34 additions & 2 deletions connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,10 @@ import (
"time"

"github.com/uber-go/atomic"
"github.com/uber/tchannel-go/tos"
"golang.org/x/net/context"
"golang.org/x/net/ipv4"
"golang.org/x/net/ipv6"
)

const (
Expand Down Expand Up @@ -129,6 +132,12 @@ type ConnectionOptions struct {

// The type of checksum to use when sending messages.
ChecksumType ChecksumType

// ToS class name marked on outbound connection. Supply a DiffServ
// compliant name, tos.AF11 tos.CS1 .. etc,
// tos.Lowdelay, tos.Throughput, tos.Reliabiltiy or tos.Lowcost
// from the tchannel tos package
TosPriority tos.ToS
}

// connectionEvents are the events that can be triggered by a connection.
Expand Down Expand Up @@ -237,7 +246,24 @@ func (co ConnectionOptions) withDefaults() ConnectionOptions {
return co
}

func (ch *Channel) newConnection(conn net.Conn, initialID uint32, outboundHP string, remotePeer PeerInfo, remotePeerAddress peerAddressComponents, events connectionEvents) *Connection {
func (ch *Channel) setConnectionTosPriority(tosPriority tos.ToS, c net.Conn) error {
// Verify we are using a TCP socket
if tcpAddr, ok := c.RemoteAddr().(*net.TCPAddr); ok {
// Handle dual stack listeners and set Traffic Class
if tcpAddr.IP.To16() != nil && tcpAddr.IP.To4() == nil {
if err := ipv6.NewConn(c).SetTrafficClass(int(tosPriority)); err != nil {
return err
}
} else if tcpAddr.IP.To4() != nil {
if err := ipv4.NewConn(c).SetTOS(int(tosPriority)); err != nil {
return err
}
}
}
return nil
}

func (ch *Channel) newConnection(conn net.Conn, initialID uint32, outboundHP string, remotePeer PeerInfo, remotePeerAddress peerAddressComponents, events connectionEvents) (*Connection, error) {
opts := ch.connectionOptions.withDefaults()

connID := _nextConnID.Inc()
Expand Down Expand Up @@ -279,6 +305,12 @@ func (ch *Channel) newConnection(conn net.Conn, initialID uint32, outboundHP str
commonStatsTags: ch.commonStatsTags,
}

if tosPriority := opts.TosPriority; tosPriority > 0 {
if err := ch.setConnectionTosPriority(tosPriority, conn); err != nil {
return nil, err
}
}

c.nextMessageID.Store(initialID)
c.log = log
c.inbound.onRemoved = c.checkExchanges
Expand All @@ -295,7 +327,7 @@ func (ch *Channel) newConnection(conn net.Conn, initialID uint32, outboundHP str

go c.readFrames(connID)
go c.writeFrames(connID)
return c
return c, nil
}

func (c *Connection) onExchangeAdded() {
Expand Down
41 changes: 41 additions & 0 deletions connection_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
package tchannel_test

import (
"encoding/json"
"errors"
"fmt"
"io"
Expand All @@ -36,6 +37,7 @@ import (
"github.com/uber/tchannel-go/relay/relaytest"
"github.com/uber/tchannel-go/testutils"
"github.com/uber/tchannel-go/testutils/testreader"
"github.com/uber/tchannel-go/tos"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -831,3 +833,42 @@ func TestConnectionIDs(t *testing.T) {
assert.Equal(t, []uint32{1}, inbound, "Unexpected outbound IDs")
})
}

func TestTosPriority(t *testing.T) {
ctx, cancel := NewContext(time.Second)
defer cancel()

opts := testutils.NewOpts().SetServiceName("s1").SetTosPriority(tos.Lowdelay)
testutils.WithTestServer(t, opts, func(ts *testutils.TestServer) {
ts.Register(raw.Wrap(newTestHandler(t)), "echo")

outbound, err := ts.Server().BeginCall(ctx, ts.HostPort(), "s1", "echo", nil)
require.NoError(t, err, "BeginCall failed")

_, outboundNetConn := OutboundConnection(outbound)
connTosPriority, err := IsTosPriority(outboundNetConn, tos.Lowdelay)
require.NoError(t, err, "Checking TOS priority failed")
assert.Equal(t, connTosPriority, true)
_, _, _, err = raw.WriteArgs(outbound, []byte("arg2"), []byte("arg3"))
require.NoError(t, err, "Failed to write to outbound conn")
})
}

func TestUnmarshalTosPriority(t *testing.T) {
expected := tos.Lowdelay

var TosBit tos.ToS
err := TosBit.UnmarshalText([]byte("Lowdelay"))
if assert.NoError(t, err, "JSON unmarshal failed") {
assert.Equal(t, expected, TosBit, "JSON config mismatch")
}
}

func TestMarshalTosPriority(t *testing.T) {
expected := "\"Lowdelay\""

v, err := json.Marshal(tos.Lowdelay)
if assert.NoError(t, err, "Marshal failed") {
assert.Equal(t, expected, string(v), "Marshal mis match")
}
}
4 changes: 2 additions & 2 deletions preinit_connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ func (ch *Channel) outboundHandshake(ctx context.Context, c net.Conn, outboundHP
return nil, NewWrappedSystemError(ErrCodeProtocol, err)
}

return ch.newConnection(c, 1 /* initialID */, outboundHP, remotePeer, remotePeerAddress, events), nil
return ch.newConnection(c, 1 /* initialID */, outboundHP, remotePeer, remotePeerAddress, events)
}

func (ch *Channel) inboundHandshake(ctx context.Context, c net.Conn, events connectionEvents) (_ *Connection, err error) {
Expand Down Expand Up @@ -93,7 +93,7 @@ func (ch *Channel) inboundHandshake(ctx context.Context, c net.Conn, events conn
return nil, err
}

return ch.newConnection(c, 0 /* initialID */, "" /* outboundHP */, remotePeer, remotePeerAddress, events), nil
return ch.newConnection(c, 0 /* initialID */, "" /* outboundHP */, remotePeer, remotePeerAddress, events)
}

func (ch *Channel) getInitParams() initParams {
Expand Down
7 changes: 7 additions & 0 deletions testutils/channel_opts.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"time"

"github.com/uber/tchannel-go"
"github.com/uber/tchannel-go/tos"

"github.com/uber-go/atomic"
)
Expand Down Expand Up @@ -124,6 +125,12 @@ func (o *ChannelOpts) SetSendBufferSize(bufSize int) *ChannelOpts {
return o
}

// SetTosPriority set TosPriority in DefaultConnectionOptions.
func (o *ChannelOpts) SetTosPriority(tosPriority tos.ToS) *ChannelOpts {
o.DefaultConnectionOptions.TosPriority = tosPriority
return o
}

// SetTimeNow sets TimeNow in ChannelOptions.
func (o *ChannelOpts) SetTimeNow(timeNow func() time.Time) *ChannelOpts {
o.TimeNow = timeNow
Expand Down
77 changes: 77 additions & 0 deletions tos/tos.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
// Copyright (c) 2015 Uber Technologies, Inc.

// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

package tos

// ToS represents a const value DF, CS3 etc
// Assured Forwarding (x=class, y=drop precedence) (RFC2597)
// Class Selector (RFC 2474)
// IP Precedence (Linux Socket Compat RFC 791
type ToS uint8

// Assured Forwarding (x=class, y=drop precedence) (RFC2597)
// Class Selector (RFC 2474)

const (
// CS3 Class Selector 3
CS3 ToS = 0x18
// CS4 Class Selector 4
CS4 ToS = 0x20
// CS5 Class Selector 5
CS5 ToS = 0x28
// CS6 Class Selector 6
CS6 ToS = 0x30
// CS7 Class Selector 7
CS7 ToS = 0x38
// AF11 Assured Forward 11
AF11 ToS = 0x0a
// AF12 Assured Forward 11
AF12 ToS = 0x0c
// AF13 Assured Forward 12
AF13 ToS = 0x0e
// AF21 Assured Forward 13
AF21 ToS = 0x12
// AF22 Assured Forward 21
AF22 ToS = 0x14
// AF23 Assured Forward 22
AF23 ToS = 0x16
// AF31 Assured Forward 23
AF31 ToS = 0x1a
// AF32 Assured Forward 31
AF32 ToS = 0x1c
// AF33 Assured Forward 32
AF33 ToS = 0x1e
// AF41 Assured Forward 33
AF41 ToS = 0x22
// AF42 Assured Forward 41
AF42 ToS = 0x24
// AF43 Assured Forward 42
AF43 ToS = 0x26
// EF Expedited Forwarding (RFC 3246)
EF ToS = 0x2e
// Lowdelay 10
Lowdelay ToS = 0x10
// Throughput 8
Throughput ToS = 0x08
// Reliability 4
Reliability ToS = 0x04
// Lowcost 2
Lowcost ToS = 0x02
)
53 changes: 53 additions & 0 deletions tos/tos_enums.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
package tos

import "fmt"

var (
_tosNameToValue map[string]ToS
_tosValueToName = map[ToS]string{
CS3: "CS3",
CS4: "CS4",
CS5: "CS5",
CS6: "CS6",
CS7: "CS7",
AF11: "AF11",
AF12: "AF12",
AF13: "AF13",
AF21: "AF21",
AF22: "AF22",
AF23: "AF23",
AF31: "AF31",
AF32: "AF32",
AF33: "AF33",
AF41: "AF41",
AF42: "AF42",
AF43: "AF43",
EF: "EF",
Lowdelay: "Lowdelay",
Throughput: "Throughput",
Reliability: "Reliability",
Lowcost: "Lowcost",
}
)

func init() {
_tosNameToValue = make(map[string]ToS, len(_tosValueToName))
for tos, tosString := range _tosValueToName {
_tosNameToValue[tosString] = tos
}
}

// MarshalText implements TextMarshaler from encoding
func (r ToS) MarshalText() ([]byte, error) {
return []byte(_tosValueToName[r]), nil
}

// UnmarshalText implements TextUnMarshaler from encoding
func (r *ToS) UnmarshalText(data []byte) error {
v, ok := _tosNameToValue[string(data)]
if !ok {
return fmt.Errorf("invalid ToS %q", string(data))
}
*r = v
return nil
}
17 changes: 17 additions & 0 deletions utils_for_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,10 @@ import (
"net"
"time"

"github.com/uber/tchannel-go/tos"
"golang.org/x/net/context"
"golang.org/x/net/ipv4"
"golang.org/x/net/ipv6"
)

// MexChannelBufferSize is the size of the message exchange channel buffer.
Expand Down Expand Up @@ -67,6 +70,20 @@ func OutboundConnection(call *OutboundCall) (*Connection, net.Conn) {
return conn, conn.conn
}

// Return true if the connection matches the passed DiffServ Name.
func IsTosPriority(c net.Conn, tosPriority tos.ToS) (bool, error) {
var connTosPriority int
var err error

if c.RemoteAddr().(*net.TCPAddr).IP.To16() != nil && c.RemoteAddr().(*net.TCPAddr).IP.To4() == nil {
connTosPriority, err = ipv6.NewConn(c).TrafficClass()
} else if c.RemoteAddr().(*net.TCPAddr).IP.To4() != nil {
connTosPriority, err = ipv4.NewConn(c).TOS()
}

return (connTosPriority == int(tosPriority)), err
}

// InboundConnection returns the underlying connection for an incoming call.
func InboundConnection(call IncomingCall) (*Connection, net.Conn) {
inboundCall, ok := call.(*InboundCall)
Expand Down

0 comments on commit 18b22ed

Please sign in to comment.