Skip to content

Commit

Permalink
Merge pull request #1451 from dpw/sleeve-compat
Browse files Browse the repository at this point in the history
Restore sleeve compatibility with 1.1
  • Loading branch information
awh committed Oct 1, 2015
2 parents 58dec73 + dc275d7 commit b5e7e9f
Show file tree
Hide file tree
Showing 4 changed files with 51 additions and 51 deletions.
8 changes: 4 additions & 4 deletions router/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -376,8 +376,8 @@ func (conn *LocalConnection) forwarderCrypto() *OverlayCrypto {
}
}

func (conn *LocalConnection) sendOverlayControlMessage(msg []byte) error {
return conn.sendProtocolMsg(ProtocolMsg{ProtocolOverlayControlMsg, msg})
func (conn *LocalConnection) sendOverlayControlMessage(tag ProtocolTag, msg []byte) error {
return conn.sendProtocolMsg(ProtocolMsg{tag, msg})
}

type ConnectionAsForwarderListener struct{ conn *LocalConnection }
Expand Down Expand Up @@ -432,8 +432,8 @@ func (conn *LocalConnection) receiveTCP(receiver TCPReceiver) {
func (conn *LocalConnection) handleProtocolMsg(tag ProtocolTag, payload []byte) error {
switch tag {
case ProtocolHeartbeat:
case ProtocolOverlayControlMsg:
conn.forwarder.ControlMessage(payload)
case ProtocolConnectionEstablished, ProtocolFragmentationReceived, ProtocolPMTUVerified, ProtocolOverlayControlMsg:
conn.forwarder.ControlMessage(tag, payload)
case ProtocolGossipUnicast, ProtocolGossipBroadcast, ProtocolGossip:
return conn.Router.handleGossip(tag, payload)
default:
Expand Down
10 changes: 6 additions & 4 deletions router/overlay.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ type ForwarderParams struct {

// Function to send a control message to the counterpart
// forwarder.
SendControlMessage func([]byte) error
SendControlMessage func(tag ProtocolTag, msg []byte) error
}

// When a consumer is called, the decoder will already have been used
Expand Down Expand Up @@ -66,8 +66,10 @@ type OverlayForwarder interface {

Stop()

// Handle a message from the peer
ControlMessage([]byte)
// Handle a message from the peer. 'tag' exists for
// compatibility, and should always be
// ProtocolOverlayControlMessage for non-sleeve overlays.
ControlMessage(tag ProtocolTag, msg []byte)
}

type OverlayForwarderListener interface {
Expand Down Expand Up @@ -98,5 +100,5 @@ func (NullOverlay) Forward(ForwardPacketKey) FlowOp {
func (NullOverlay) Stop() {
}

func (NullOverlay) ControlMessage([]byte) {
func (NullOverlay) ControlMessage(ProtocolTag, []byte) {
}
5 changes: 4 additions & 1 deletion router/protocol.go
Original file line number Diff line number Diff line change
Expand Up @@ -329,10 +329,13 @@ type ProtocolTag byte

const (
ProtocolHeartbeat ProtocolTag = iota
ProtocolOverlayControlMsg
ProtocolConnectionEstablished
ProtocolFragmentationReceived
ProtocolPMTUVerified
ProtocolGossip
ProtocolGossipUnicast
ProtocolGossipBroadcast
ProtocolOverlayControlMsg
)

type ProtocolMsg struct {
Expand Down
79 changes: 37 additions & 42 deletions router/sleeve.go
Original file line number Diff line number Diff line change
Expand Up @@ -277,13 +277,14 @@ type sleeveForwarder struct {
sleeve *SleeveOverlay
remotePeer *Peer
remotePeerBin []byte
sendControlMsg func([]byte) error
sendControlMsg func(ProtocolTag, []byte) error
connUID uint64

// Channels to communicate with the aggregator goroutine
aggregatorChan chan<- aggregatorFrame
aggregatorDFChan chan<- aggregatorFrame
specialChan chan<- specialFrame
controlMsgChan chan<- controlMessage
confirmedChan chan<- struct{}
finishedChan <-chan struct{}

Expand Down Expand Up @@ -326,13 +327,18 @@ type aggregatorFrame struct {
frame []byte
}

// A "special" message over UDP, or a control message. The sender is
// nil for control messages.
// A "special" frame over UDP
type specialFrame struct {
sender *net.UDPAddr
frame []byte
}

// A control message
type controlMessage struct {
tag ProtocolTag
msg []byte
}

func (sleeve *SleeveOverlay) MakeForwarder(params ForwarderParams) (OverlayForwarder, error) {
var crypto OverlayCrypto
if params.Crypto != nil {
Expand All @@ -349,6 +355,7 @@ func (sleeve *SleeveOverlay) MakeForwarder(params ForwarderParams) (OverlayForwa
aggChan := make(chan aggregatorFrame, ChannelSize)
aggDFChan := make(chan aggregatorFrame, ChannelSize)
specialChan := make(chan specialFrame, 1)
controlMsgChan := make(chan controlMessage, 1)
confirmedChan := make(chan struct{})
finishedChan := make(chan struct{})

Expand All @@ -361,6 +368,7 @@ func (sleeve *SleeveOverlay) MakeForwarder(params ForwarderParams) (OverlayForwa
aggregatorChan: aggChan,
aggregatorDFChan: aggDFChan,
specialChan: specialChan,
controlMsgChan: controlMsgChan,
confirmedChan: confirmedChan,
finishedChan: finishedChan,
remoteAddr: params.RemoteAddr,
Expand All @@ -372,8 +380,8 @@ func (sleeve *SleeveOverlay) MakeForwarder(params ForwarderParams) (OverlayForwa
senderDF: newUDPSenderDF(params.LocalIP, sleeve.localPort),
}

go fwd.run(aggChan, aggDFChan, specialChan, confirmedChan,
finishedChan)
go fwd.run(aggChan, aggDFChan, specialChan, controlMsgChan,
confirmedChan, finishedChan)
return fwd, nil
}

Expand Down Expand Up @@ -569,9 +577,9 @@ func frameTooBig(frame []byte, mtu int) bool {
return len(frame) > mtu+EthernetOverhead
}

func (fwd *sleeveForwarder) ControlMessage(msg []byte) {
func (fwd *sleeveForwarder) ControlMessage(tag ProtocolTag, msg []byte) {
select {
case fwd.specialChan <- specialFrame{nil, msg}:
case fwd.controlMsgChan <- controlMessage{tag, msg}:
case <-fwd.finishedChan:
}
}
Expand All @@ -588,6 +596,7 @@ func (fwd *sleeveForwarder) Stop() {
func (fwd *sleeveForwarder) run(aggChan <-chan aggregatorFrame,
aggDFChan <-chan aggregatorFrame,
specialChan <-chan specialFrame,
controlMsgChan <-chan controlMessage,
confirmedChan <-chan struct{},
finishedChan chan<- struct{}) {
defer close(finishedChan)
Expand All @@ -605,14 +614,11 @@ loop:
err = fwd.aggregateAndSend(frame, aggDFChan,
fwd.crypto.EncDF, fwd.senderDF, fwd.maxPayload)

case special := <-specialChan:
if special.sender == nil {
// Control messages are sent on specialChan,
// with a nil sender
err = fwd.handleControlMsg(special.frame)
} else {
err = fwd.handleSpecialFrame(special)
}
case sf := <-specialChan:
err = fwd.handleSpecialFrame(sf)

case cm := <-controlMsgChan:
err = fwd.handleControlMessage(cm)

case _, ok := <-confirmedChan:
if !ok {
Expand Down Expand Up @@ -741,32 +747,20 @@ func (fwd *sleeveForwarder) handleSpecialFrame(special specialFrame) error {
}
}

const (
HeartbeatAck = iota
FragTestAck
MTUTestAck
)

func (fwd *sleeveForwarder) handleControlMsg(msg []byte) error {
if len(msg) == 0 {
log.Print(fwd.logPrefix(),
"Received zero-length control message")
return nil
}

switch msg[0] {
case HeartbeatAck:
func (fwd *sleeveForwarder) handleControlMessage(cm controlMessage) error {
switch cm.tag {
case ProtocolConnectionEstablished:
return fwd.handleHeartbeatAck()

case FragTestAck:
case ProtocolFragmentationReceived:
return fwd.handleFragTestAck()

case MTUTestAck:
return fwd.handleMTUTestAck(msg)
case ProtocolPMTUVerified:
return fwd.handleMTUTestAck(cm.msg)

default:
log.Print(fwd.logPrefix(),
"Ignoring unknown control message: ", msg[0])
"Ignoring unknown control message tag: ", cm.tag)
return nil
}
}
Expand Down Expand Up @@ -832,7 +826,7 @@ func (fwd *sleeveForwarder) handleHeartbeat(special specialFrame) error {

if !fwd.ackedHeartbeat {
fwd.ackedHeartbeat = true
if err := fwd.sendControlMsg([]byte{HeartbeatAck}); err != nil {
if err := fwd.sendControlMsg(ProtocolConnectionEstablished, nil); err != nil {
return err
}
}
Expand All @@ -857,6 +851,8 @@ func (fwd *sleeveForwarder) setRemoteAddr(addr *net.UDPAddr) {
}

func (fwd *sleeveForwarder) handleHeartbeatAck() error {
log.Debug(fwd.logPrefix(), "handleHeartbeatAck")

// The connection is now regarded as established
fwd.notifyEstablished()

Expand Down Expand Up @@ -899,7 +895,7 @@ func (fwd *sleeveForwarder) handleFragTest(frame []byte) error {
return nil
}

return fwd.sendControlMsg([]byte{FragTestAck})
return fwd.sendControlMsg(ProtocolFragmentationReceived, nil)
}

func (fwd *sleeveForwarder) handleFragTestAck() error {
Expand Down Expand Up @@ -943,19 +939,18 @@ func (fwd *sleeveForwarder) sendMTUTest() error {
}

func (fwd *sleeveForwarder) handleMTUTest(frame []byte) error {
buf := make([]byte, 3)
buf[0] = MTUTestAck
binary.BigEndian.PutUint16(buf[1:], uint16(len(frame)-EthernetOverhead))
return fwd.sendControlMsg(buf)
buf := make([]byte, 2)
binary.BigEndian.PutUint16(buf, uint16(len(frame)-EthernetOverhead))
return fwd.sendControlMsg(ProtocolPMTUVerified, buf)
}

func (fwd *sleeveForwarder) handleMTUTestAck(msg []byte) error {
if len(msg) < 3 {
if len(msg) < 2 {
log.Print(fwd.logPrefix(), "Received truncated MTUTestAck")
return nil
}

mtu := int(binary.BigEndian.Uint16(msg[1:]))
mtu := int(binary.BigEndian.Uint16(msg))
log.Debug(fwd.logPrefix(),
"handleMTUTestAck: for mtu candidate ", mtu)
if mtu != fwd.mtuCandidate {
Expand Down

0 comments on commit b5e7e9f

Please sign in to comment.