Skip to content

Commit

Permalink
refactor to flexible addresses
Browse files Browse the repository at this point in the history
Signed-off-by: Seán C McCord <ulexus@gmail.com>
  • Loading branch information
Ulexus committed Jul 11, 2021
1 parent cd02b5a commit 924fed4
Show file tree
Hide file tree
Showing 4 changed files with 152 additions and 92 deletions.
4 changes: 2 additions & 2 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@ func Add(rootURL, clusterID string, n *types.Node) error {
return nil
}

// AddKnownEndpoints adds a list of known-good endpoints to a node.
func AddKnownEndpoints(rootURL, clusterID string, id string, epList ... *types.KnownEndpoint) error {
// AddAddresses adds a list of addresses to a node.
func AddAddresses(rootURL, clusterID string, id string, epList ... *types.Address) error {
buf := new(bytes.Buffer)

if err := json.NewEncoder(buf).Encode(epList); err != nil {
Expand Down
53 changes: 9 additions & 44 deletions db/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@ type DB interface {
// Add adds a set of known Endpoints to a node, creating the node, if it does not exist.
Add(cluster string, n *types.Node) error

// AddKnownEndpoints adds a set of known-good endpoints for a node.
AddKnownEndpoints(cluster string, id string, ep ...*types.KnownEndpoint) error
// AddAddresses adds a set of addresses for a node.
AddAddresses(cluster string, id string, ep ...*types.Address) error

// Get returns the details of the node.
Get(cluster string, id string) (*types.Node,error)
Expand Down Expand Up @@ -44,37 +44,18 @@ func (d *ramDB) Add(cluster string, n *types.Node) error {
d.db[cluster] = c
}

existingNode, ok := c[n.ID]
if !ok {
c[n.ID] = n
return nil
}

var found bool
if existing, ok := c[n.ID]; ok {
existing.AddAddresses(n.Addresses...)

existingNode.Name = n.Name
existingNode.ID = n.ID
existingNode.IP = n.IP

for _, ep := range n.KnownEndpoints {
found = false

for _, existing := range existingNode.KnownEndpoints {
if existing == ep {
found = true
break
}
}
return nil
}

if !found {
existingNode.KnownEndpoints = append(existingNode.KnownEndpoints, ep)
}
}
c[n.ID] = n

return nil
}

func (d *ramDB) AddKnownEndpoints(cluster string, id string, knownEndpoints ...*types.KnownEndpoint) error {
func (d *ramDB) AddAddresses(cluster string, id string, addresses ...*types.Address) error {
d.mu.Lock()
defer d.mu.Unlock()

Expand All @@ -88,23 +69,7 @@ func (d *ramDB) AddKnownEndpoints(cluster string, id string, knownEndpoints ...*
return fmt.Errorf("node does not exist")
}

for _, ep := range knownEndpoints {
var found bool

for _, existing := range n.KnownEndpoints {
if ep.Endpoint == existing.Endpoint {
found = true

existing.LastConnected = ep.LastConnected

break
}
}

if !found {
n.KnownEndpoints = append(n.KnownEndpoints, ep)
}
}
n.AddAddresses(addresses...)

return nil
}
Expand Down
62 changes: 26 additions & 36 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ import (
var listenAddr = ":3000"
var devMode bool

const defaultPort = 5000

var nodeDB db.DB

func init() {
Expand All @@ -39,7 +41,7 @@ func main() {
}
}

defer logger.Sync()
defer logger.Sync() // nolint: errcheck

nodeDB = db.New()

Expand Down Expand Up @@ -99,24 +101,18 @@ func main() {
zap.String("cluster", c.Params("cluster", "")),
zap.String("node", n.ID),
zap.String("ip", n.IP.String()),
zap.Strings("endpoints", func() (out []string) {
for _, ep := range n.KnownEndpoints {
if !ep.Endpoint.IsZero() {
out = append(out, ep.Endpoint.String())
}
}
return out
}()),
zap.Strings("addresses", addressToString(n.Addresses)),
zap.Error(err),
)

return c.JSON(n)
})

// PUT addresses to a Node
app.Put("/:cluster/:node", func(c *fiber.Ctx) error {
var knownEndpoints []*types.KnownEndpoint
var addresses []*types.Address

if err := c.BodyParser(&knownEndpoints); err != nil {
if err := c.BodyParser(&addresses); err != nil {
logger.Error("failed to parse node PUT",
zap.String("cluster", c.Params("cluster", "")),
zap.String("node", c.Params("node", "")),
Expand All @@ -134,18 +130,11 @@ func main() {
)
}

if err := nodeDB.AddKnownEndpoints(c.Params("cluster", ""), node, knownEndpoints...); err != nil {
if err := nodeDB.AddAddresses(c.Params("cluster", ""), node, addresses...); err != nil {
logger.Error("failed to add known endpoints",
zap.String("cluster", c.Params("cluster", "")),
zap.String("node", node),
zap.Strings("endpoints", func() (out []string) {
for _, ep := range knownEndpoints {
if !ep.Endpoint.IsZero() {
out = append(out, ep.Endpoint.String())
}
}
return out
}()),
zap.Strings("addresses", addressToString(addresses)),
zap.Error(err),
)
return c.SendStatus(http.StatusInternalServerError)
Expand All @@ -170,14 +159,7 @@ func main() {
zap.String("cluster", c.Params("cluster", "")),
zap.String("node", n.ID),
zap.String("ip", n.IP.String()),
zap.Strings("endpoints", func() (out []string) {
for _, ep := range n.KnownEndpoints {
if !ep.Endpoint.IsZero() {
out = append(out, ep.Endpoint.String())
}
}
return out
}()),
zap.Strings("addresses", addressToString(n.Addresses)),
zap.Error(err),
)
return c.SendStatus(http.StatusInternalServerError)
Expand All @@ -187,14 +169,7 @@ func main() {
zap.String("cluster", c.Params("cluster", "")),
zap.String("node", n.ID),
zap.String("ip", n.IP.String()),
zap.Strings("endpoints", func() (out []string) {
for _, ep := range n.KnownEndpoints {
if !ep.Endpoint.IsZero() {
out = append(out, ep.Endpoint.String())
}
}
return out
}()),
zap.Strings("addresses", addressToString(n.Addresses)),
)

return c.SendStatus(http.StatusNoContent)
Expand All @@ -204,3 +179,18 @@ func main() {
zap.Error(app.Listen(listenAddr)),
)
}

func addressToString(addresses []*types.Address) (out []string) {
for _, a := range addresses {
ep, err := a.Endpoint(defaultPort)
if err != nil {
out = append(out, err.Error())

continue
}

out = append(out, ep.String())
}

return out
}
125 changes: 115 additions & 10 deletions types/types.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,71 @@
package types

import (
"fmt"
"net"
"sync"
"time"

"inet.af/netaddr"
)

// Address describes an IP or DNS address with optional Port.
type Address struct {
// DNSName is the DNS name of this NodeAddress, if known.
Name string

// IP is the IP address of this NodeAddress, if known.
IP netaddr.IP

// Port is the port number for this NodeAddress, if known.
Port uint16

// LastReported indicates the time at which this address was last reported.
LastReported time.Time
}

// EqualHost indicates whether two addresses have the same host portion, ignoring the ports.
func (a *Address) EqualHost(other *Address) bool {
if ! a.IP.IsZero() || ! other.IP.IsZero() {
return a.IP == other.IP
}

return a.Name == other.Name
}

// Equal indicates whether two addresses are equal.
func (a *Address) Equal(other *Address) bool {
if !a.EqualHost(other) {
return false
}

return a.Port == other.Port
}

// Endpoint returns a UDP endpoint address for the Address, using the defaultPort if none is known.
func (a *Address) Endpoint(defaultPort uint16) (*net.UDPAddr, error) {
proto := "udp"
addr := a.Name
port := a.Port

if !a.IP.IsZero() {
addr = a.IP.String()

if a.IP.Is6() {
proto = "udp6"
addr = "[" + addr + "]"
} else {
proto = "udp4"
}
}

if port == 0 {
port = defaultPort
}

return net.ResolveUDPAddr(proto, fmt.Sprintf("%s:%d", addr, port))
}

// Node describes a Wireguard Peer
type Node struct {
// Name is the human-readable identifier of this Node.
Expand All @@ -17,20 +77,65 @@ type Node struct {
// Usually, this is the Wireguard Public Key of the Node.
ID string `json:"id,omitempty"`

// IP is the Wireguard interface IP of this Node.
// IP is the IP address of the Wireguard interface on this Node.
IP netaddr.IP `json:"ip,omitempty"`

// KnownEndpoints is a list of known endpoints (host:port) for this Node.
KnownEndpoints []*KnownEndpoint `json:"knownEndpoints,omitempty"`
// Addresses is a list of addresses for the Node.
Addresses []*Address `json:"selfIPs,omitempty"`

// SelfAddresses is a list of addresses assigned to the Node itself, either directly or via NAT.
SelfIPs []string `json:"selfIPs,omitempty"`
mu sync.Mutex
}

type KnownEndpoint struct {
// Endpoint describes the IP:Port of the known-good connection
Endpoint netaddr.IPPort `json:"endpoint"`
// AddAddresses adds a set of addresses to a Node.
func (n *Node) AddAddresses(addresses ... *Address) {
n.mu.Lock()
defer n.mu.Unlock()

for _, a := range addresses {
var found bool

if a.LastReported.IsZero() {
a.LastReported = time.Now()
}

for _, existing := range n.Addresses {
if a.EqualHost(existing) {
found = true

if a.Port > 0 {
existing.Port = a.Port
}

existing.LastReported = a.LastReported

break
}
}

if !found {
n.Addresses = append(n.Addresses, a)
}
}
}

// ExpireAddressesOlderThan removes addresses from the Node which have not been reported within the given timeframe.
func (n *Node) ExpireAddressesOlderThan(maxAge time.Duration) {
n.mu.Lock()
defer n.mu.Unlock()

i := 0

for _, a := range n.Addresses {
if time.Since(a.LastReported) < maxAge {
n.Addresses[i] = a

i++

continue
}

a = nil
}

// LastConnected records the time at which the endpoint was last reported to be good.
LastConnected time.Time `json:"lastConnected"`
n.Addresses = n.Addresses[:i]
}

0 comments on commit 924fed4

Please sign in to comment.