Skip to content

Commit

Permalink
trigger handshakes when lighthouse reply arrives (#246)
Browse files Browse the repository at this point in the history
Currently, we wait until the next timer tick to act on the lighthouse's
reply to our HostQuery. This means we can easily add hundreds of
milliseconds of unnecessary delay to the handshake. To fix this, we
can introduce a channel to trigger an outbound handshake without waiting
for the next timer tick.

A few samples of cold ping time between two hosts that require a
lighthouse lookup:

    before (v1.2.0):

    time=156 ms
    time=252 ms
    time=12.6 ms
    time=301 ms
    time=352 ms
    time=49.4 ms
    time=150 ms
    time=13.5 ms
    time=8.24 ms
    time=161 ms
    time=355 ms

    after:

    time=3.53 ms
    time=3.14 ms
    time=3.08 ms
    time=3.92 ms
    time=7.78 ms
    time=3.59 ms
    time=3.07 ms
    time=3.22 ms
    time=3.12 ms
    time=3.08 ms
    time=8.04 ms

I recommend reviewing this PR by looking at each commit individually, as
some refactoring was required that makes the diff a bit confusing when
combined together.
  • Loading branch information
wadey committed Jul 22, 2020
1 parent 4645e60 commit 4756c96
Show file tree
Hide file tree
Showing 5 changed files with 167 additions and 64 deletions.
3 changes: 3 additions & 0 deletions examples/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,9 @@ logging:
#retries: 20
# wait_rotation is the number of handshake attempts to do before starting to try non-local IP addresses
#wait_rotation: 5
# trigger_buffer is the size of the buffer channel for quickly sending handshakes
# after receiving the response for lighthouse queries
#trigger_buffer: 64

# Nebula security group configuration
firewall:
Expand Down
162 changes: 101 additions & 61 deletions handshake_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,21 +16,24 @@ const (
DefaultHandshakeTryInterval = time.Millisecond * 100
DefaultHandshakeRetries = 20
// DefaultHandshakeWaitRotation is the number of handshake attempts to do before starting to use other ips addresses
DefaultHandshakeWaitRotation = 5
DefaultHandshakeWaitRotation = 5
DefaultHandshakeTriggerBuffer = 64
)

var (
defaultHandshakeConfig = HandshakeConfig{
tryInterval: DefaultHandshakeTryInterval,
retries: DefaultHandshakeRetries,
waitRotation: DefaultHandshakeWaitRotation,
tryInterval: DefaultHandshakeTryInterval,
retries: DefaultHandshakeRetries,
waitRotation: DefaultHandshakeWaitRotation,
triggerBuffer: DefaultHandshakeTriggerBuffer,
}
)

type HandshakeConfig struct {
tryInterval time.Duration
retries int
waitRotation int
tryInterval time.Duration
retries int
waitRotation int
triggerBuffer int

messageMetrics *MessageMetrics
}
Expand All @@ -42,6 +45,9 @@ type HandshakeManager struct {
outside *udpConn
config HandshakeConfig

// can be used to trigger outbound handshake for the given vpnIP
trigger chan uint32

OutboundHandshakeTimer *SystemTimerWheel
InboundHandshakeTimer *SystemTimerWheel

Expand All @@ -57,6 +63,8 @@ func NewHandshakeManager(tunCidr *net.IPNet, preferredRanges []*net.IPNet, mainH

config: config,

trigger: make(chan uint32, config.triggerBuffer),

OutboundHandshakeTimer: NewSystemTimerWheel(config.tryInterval, config.tryInterval*time.Duration(config.retries)),
InboundHandshakeTimer: NewSystemTimerWheel(config.tryInterval, config.tryInterval*time.Duration(config.retries)),

Expand All @@ -66,9 +74,15 @@ func NewHandshakeManager(tunCidr *net.IPNet, preferredRanges []*net.IPNet, mainH

func (c *HandshakeManager) Run(f EncWriter) {
clockSource := time.Tick(c.config.tryInterval)
for now := range clockSource {
c.NextOutboundHandshakeTimerTick(now, f)
c.NextInboundHandshakeTimerTick(now)
for {
select {
case vpnIP := <-c.trigger:
l.WithField("vpnIp", IntIp(vpnIP)).Debug("HandshakeManager: triggered")
c.handleOutbound(vpnIP, f, true)
case now := <-clockSource:
c.NextOutboundHandshakeTimerTick(now, f)
c.NextInboundHandshakeTimerTick(now)
}
}
}

Expand All @@ -80,69 +94,86 @@ func (c *HandshakeManager) NextOutboundHandshakeTimerTick(now time.Time, f EncWr
break
}
vpnIP := ep.(uint32)
c.handleOutbound(vpnIP, f, false)
}
}

index, err := c.pendingHostMap.GetIndexByVpnIP(vpnIP)
if err != nil {
continue
}

hostinfo, err := c.pendingHostMap.QueryVpnIP(vpnIP)
if err != nil {
continue
}
func (c *HandshakeManager) handleOutbound(vpnIP uint32, f EncWriter, lighthouseTriggered bool) {
index, err := c.pendingHostMap.GetIndexByVpnIP(vpnIP)
if err != nil {
return
}
hostinfo, err := c.pendingHostMap.QueryVpnIP(vpnIP)
if err != nil {
return
}

// If we haven't finished the handshake and we haven't hit max retries, query
// lighthouse and then send the handshake packet again.
if hostinfo.HandshakeCounter < c.config.retries && !hostinfo.HandshakeComplete {
if hostinfo.remote == nil {
// We continue to query the lighthouse because hosts may
// come online during handshake retries. If the query
// succeeds (no error), add the lighthouse info to hostinfo
ips, err := c.lightHouse.Query(vpnIP, f)
if err == nil {
for _, ip := range ips {
hostinfo.AddRemote(ip)
}
hostinfo.ForcePromoteBest(c.mainHostMap.preferredRanges)
// If we haven't finished the handshake and we haven't hit max retries, query
// lighthouse and then send the handshake packet again.
if hostinfo.HandshakeCounter < c.config.retries && !hostinfo.HandshakeComplete {
if hostinfo.remote == nil {
// We continue to query the lighthouse because hosts may
// come online during handshake retries. If the query
// succeeds (no error), add the lighthouse info to hostinfo
ips := c.lightHouse.QueryCache(vpnIP)
// If we have no responses yet, or only one IP (the host hadn't
// finished reporting its own IPs yet), then send another query to
// the LH.
if len(ips) <= 1 {
ips, err = c.lightHouse.Query(vpnIP, f)
}
if err == nil {
for _, ip := range ips {
hostinfo.AddRemote(ip)
}
hostinfo.ForcePromoteBest(c.mainHostMap.preferredRanges)
}
} else if lighthouseTriggered {
// We were triggered by a lighthouse HostQueryReply packet, but
// we have already picked a remote for this host (this can happen
// if we are configured with multiple lighthouses). So we can skip
// this trigger and let the timerwheel handle the rest of the
// process
return
}

hostinfo.HandshakeCounter++
hostinfo.HandshakeCounter++

// We want to use the "best" calculated ip for the first 5 attempts, after that we just blindly rotate through
// all the others until we can stand up a connection.
if hostinfo.HandshakeCounter > c.config.waitRotation {
hostinfo.rotateRemote()
}
// We want to use the "best" calculated ip for the first 5 attempts, after that we just blindly rotate through
// all the others until we can stand up a connection.
if hostinfo.HandshakeCounter > c.config.waitRotation {
hostinfo.rotateRemote()
}

// Ensure the handshake is ready to avoid a race in timer tick and stage 0 handshake generation
if hostinfo.HandshakeReady && hostinfo.remote != nil {
c.messageMetrics.Tx(handshake, NebulaMessageSubType(hostinfo.HandshakePacket[0][1]), 1)
err := c.outside.WriteTo(hostinfo.HandshakePacket[0], hostinfo.remote)
if err != nil {
hostinfo.logger().WithField("udpAddr", hostinfo.remote).
WithField("initiatorIndex", hostinfo.localIndexId).
WithField("remoteIndex", hostinfo.remoteIndexId).
WithField("handshake", m{"stage": 1, "style": "ix_psk0"}).
WithError(err).Error("Failed to send handshake message")
} else {
//TODO: this log line is assuming a lot of stuff around the cached stage 0 handshake packet, we should
// keep the real packet struct around for logging purposes
hostinfo.logger().WithField("udpAddr", hostinfo.remote).
WithField("initiatorIndex", hostinfo.localIndexId).
WithField("remoteIndex", hostinfo.remoteIndexId).
WithField("handshake", m{"stage": 1, "style": "ix_psk0"}).
Info("Handshake message sent")
}
// Ensure the handshake is ready to avoid a race in timer tick and stage 0 handshake generation
if hostinfo.HandshakeReady && hostinfo.remote != nil {
c.messageMetrics.Tx(handshake, NebulaMessageSubType(hostinfo.HandshakePacket[0][1]), 1)
err := c.outside.WriteTo(hostinfo.HandshakePacket[0], hostinfo.remote)
if err != nil {
hostinfo.logger().WithField("udpAddr", hostinfo.remote).
WithField("initiatorIndex", hostinfo.localIndexId).
WithField("remoteIndex", hostinfo.remoteIndexId).
WithField("handshake", m{"stage": 1, "style": "ix_psk0"}).
WithError(err).Error("Failed to send handshake message")
} else {
//TODO: this log line is assuming a lot of stuff around the cached stage 0 handshake packet, we should
// keep the real packet struct around for logging purposes
hostinfo.logger().WithField("udpAddr", hostinfo.remote).
WithField("initiatorIndex", hostinfo.localIndexId).
WithField("remoteIndex", hostinfo.remoteIndexId).
WithField("handshake", m{"stage": 1, "style": "ix_psk0"}).
Info("Handshake message sent")
}
}

// Readd to the timer wheel so we continue trying wait HandshakeTryInterval * counter longer for next try
// Readd to the timer wheel so we continue trying wait HandshakeTryInterval * counter longer for next try
if !lighthouseTriggered {
//l.Infoln("Interval: ", HandshakeTryInterval*time.Duration(hostinfo.HandshakeCounter))
c.OutboundHandshakeTimer.Add(vpnIP, c.config.tryInterval*time.Duration(hostinfo.HandshakeCounter))
} else {
c.pendingHostMap.DeleteVpnIP(vpnIP)
c.pendingHostMap.DeleteIndex(index)
}
} else {
c.pendingHostMap.DeleteVpnIP(vpnIP)
c.pendingHostMap.DeleteIndex(index)
}
}

Expand All @@ -169,6 +200,15 @@ func (c *HandshakeManager) AddVpnIP(vpnIP uint32) *HostInfo {
// We lock here and use an array to insert items to prevent locking the
// main receive thread for very long by waiting to add items to the pending map
c.OutboundHandshakeTimer.Add(vpnIP, c.config.tryInterval)

// If this is a static host, we don't need to wait for the HostQueryReply
// We can trigger the handshake right now
if _, ok := c.lightHouse.staticList[vpnIP]; ok {
select {
case c.trigger <- vpnIP:
default:
}
}
return hostinfo
}

Expand Down
50 changes: 50 additions & 0 deletions handshake_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,56 @@ func Test_NewHandshakeManagerVpnIP(t *testing.T) {
}
}

func Test_NewHandshakeManagerTrigger(t *testing.T) {
_, tuncidr, _ := net.ParseCIDR("172.1.1.1/24")
_, vpncidr, _ := net.ParseCIDR("172.1.1.1/24")
_, localrange, _ := net.ParseCIDR("10.1.1.1/24")
ip := ip2int(net.ParseIP("172.1.1.2"))
preferredRanges := []*net.IPNet{localrange}
mw := &mockEncWriter{}
mainHM := NewHostMap("test", vpncidr, preferredRanges)
lh := &LightHouse{}

blah := NewHandshakeManager(tuncidr, preferredRanges, mainHM, lh, &udpConn{}, defaultHandshakeConfig)

now := time.Now()
blah.NextOutboundHandshakeTimerTick(now, mw)

assert.Equal(t, 0, testCountTimerWheelEntries(blah.OutboundHandshakeTimer))

blah.AddVpnIP(ip)

assert.Equal(t, 1, testCountTimerWheelEntries(blah.OutboundHandshakeTimer))

// Trigger the same method the channel will
blah.handleOutbound(ip, mw, true)

// Make sure the trigger doesn't schedule another timer entry
assert.Equal(t, 1, testCountTimerWheelEntries(blah.OutboundHandshakeTimer))
hi := blah.pendingHostMap.Hosts[ip]
assert.Nil(t, hi.remote)

lh.addrMap = map[uint32][]udpAddr{
ip: {*NewUDPAddrFromString("10.1.1.1:4242")},
}

// This should trigger the hostmap to populate the hostinfo
blah.handleOutbound(ip, mw, true)
assert.NotNil(t, hi.remote)
assert.Equal(t, 1, testCountTimerWheelEntries(blah.OutboundHandshakeTimer))
}

func testCountTimerWheelEntries(tw *SystemTimerWheel) (c int) {
for _, i := range tw.wheel {
n := i.Head
for n != nil {
c++
n = n.Next
}
}
return c
}

func Test_NewHandshakeManagerVpnIPcleanup(t *testing.T) {
_, tuncidr, _ := net.ParseCIDR("172.1.1.1/24")
_, vpncidr, _ := net.ParseCIDR("172.1.1.1/24")
Expand Down
8 changes: 8 additions & 0 deletions lighthouse.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@ type LightHouse struct {
// filters local addresses that we advertise to lighthouses
localAllowList *AllowList

// used to trigger the HandshakeManager when we receive HostQueryReply
handshakeTrigger chan<- uint32

// staticList exists to avoid having a bool in each addrMap entry
// since static should be rare
staticList map[uint32]struct{}
Expand Down Expand Up @@ -358,6 +361,11 @@ func (lh *LightHouse) HandleRequest(rAddr *udpAddr, vpnIp uint32, p []byte, c *c
ans := NewUDPAddr(a.Ip, uint16(a.Port))
lh.AddRemote(n.Details.VpnIp, ans, false)
}
// Non-blocking attempt to trigger, skip if it would block
select {
case lh.handshakeTrigger <- n.Details.VpnIp:
default:
}

case NebulaMeta_HostUpdateNotification:
//Simple check that the host sent this not someone else
Expand Down
8 changes: 5 additions & 3 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -318,14 +318,16 @@ func Main(config *Config, configTest bool, block bool, buildVersion string, logg
}

handshakeConfig := HandshakeConfig{
tryInterval: config.GetDuration("handshakes.try_interval", DefaultHandshakeTryInterval),
retries: config.GetInt("handshakes.retries", DefaultHandshakeRetries),
waitRotation: config.GetInt("handshakes.wait_rotation", DefaultHandshakeWaitRotation),
tryInterval: config.GetDuration("handshakes.try_interval", DefaultHandshakeTryInterval),
retries: config.GetInt("handshakes.retries", DefaultHandshakeRetries),
waitRotation: config.GetInt("handshakes.wait_rotation", DefaultHandshakeWaitRotation),
triggerBuffer: config.GetInt("handshakes.trigger_buffer", DefaultHandshakeTriggerBuffer),

messageMetrics: messageMetrics,
}

handshakeManager := NewHandshakeManager(tunCidr, preferredRanges, hostMap, lightHouse, udpServer, handshakeConfig)
lightHouse.handshakeTrigger = handshakeManager.trigger

//TODO: These will be reused for psk
//handshakeMACKey := config.GetString("handshake_mac.key", "")
Expand Down

0 comments on commit 4756c96

Please sign in to comment.