Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add GossiperMaker, GetGossip(), and UTs #119

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions connection_maker.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"fmt"
"math/rand"
"net"
"strconv"
"time"
"unicode"
)
Expand Down Expand Up @@ -93,7 +94,7 @@ func (cm *connectionMaker) InitiateConnections(peers []string, replace bool) []e
}
if host == "" || !isAlnum(port) {
errors = append(errors, fmt.Errorf("invalid peer name %q, should be host[:port]", peer))
} else if addr, err := net.ResolveTCPAddr("tcp", fmt.Sprintf("%s:%s", host, port)); err != nil {
} else if addr, err := net.ResolveTCPAddr("tcp", net.JoinHostPort(host, port)); err != nil {
errors = append(errors, err)
} else {
addrs[peer] = addr
Expand Down Expand Up @@ -331,7 +332,7 @@ func (cm *connectionMaker) addPeerTargets(ourConnectedPeers peerNameSet, addTarg
// ephemeral) remote port of an inbound connection
// that some peer has. Let's try to connect on the
// weave port instead.
addTarget(fmt.Sprintf("%s:%d", ip, cm.port))
addTarget(net.JoinHostPort(ip, strconv.Itoa(cm.port)))
}
}
})
Expand Down
51 changes: 51 additions & 0 deletions gossip_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,57 @@ func TestGossipSurrogate(t *testing.T) {
g3.checkHas(t, 1, 2)
}

type testGossiperMaker struct{}

func (t *testGossiperMaker) MakeGossiper(channelName string, router *Router) Gossiper {
return newTestGossiper()
}

func TestGossiperMaker(t *testing.T) {
// create the topology r1 <-> r2 <-> r3
r1 := newTestRouter(t, "01:00:00:01:00:00")
r2 := newTestRouter(t, "02:00:00:02:00:00")
r3 := newTestRouter(t, "03:00:00:03:00:00")

// auto-create a gossiper at the far end, but not the middle
r3.GossiperMaker = &testGossiperMaker{}

routers := []*Router{r1, r2, r3}
addTestGossipConnection(r1, r2)
addTestGossipConnection(r3, r2)
flushAndCheckTopology(t, routers, r1.tp(r2), r2.tp(r1, r3), r3.tp(r2))

// create a gossiper at the near end
g1 := newTestGossiper()
s1, err := r1.NewGossip("Test", g1)
require.NoError(t, err)

// broadcast a message from the near end, check it reaches the far end
broadcast(s1, 1)
sendPendingGossip(r1, r2, r3)

// ensure g3 has an auto-created gossip
var g3 *testGossiper
s3 := r3.GetGossip("Test")
switch s3.(type) {
case *gossipChannel:
switch s3.(*gossipChannel).gossiper.(type) {
case *testGossiper:
g3 = s3.(*gossipChannel).gossiper.(*testGossiper)
g3.checkHas(t, 1)
default:
t.Fatal("r3 did not auto-create a testGossiper using GossiperMaker")
}
default:
t.Fatal("r3 did not create a gossipChannel")
}

// send it back and check it reaches the near end
broadcast(s3, 2)
sendPendingGossip(r1, r2, r3)
g1.checkHas(t, 2)
}

type testGossiper struct {
sync.RWMutex
state map[byte]struct{}
Expand Down
24 changes: 23 additions & 1 deletion router.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,11 @@ type Config struct {
GossipInterval *time.Duration
}

// GossiperMaker is an interface to create a Gossiper instance
type GossiperMaker interface {
MakeGossiper(channelName string, router *Router) Gossiper
}

// Router manages communication between this peer and the rest of the mesh.
// Router implements Gossiper.
type Router struct {
Expand All @@ -50,6 +55,7 @@ type Router struct {
Peers *Peers
Routes *routes
ConnectionMaker *connectionMaker
GossiperMaker GossiperMaker
gossipLock sync.RWMutex
gossipChannels gossipChannels
topologyGossip Gossip
Expand Down Expand Up @@ -144,6 +150,13 @@ func (router *Router) NewGossip(channelName string, g Gossiper) (Gossip, error)
return channel, nil
}

// GetGossip returns a GossipChannel from the router, or nil if the channel has not been seen/created
func (router *Router) GetGossip(channelName string) Gossip {
router.gossipLock.Lock()
defer router.gossipLock.Unlock()
return router.gossipChannels[channelName]
}

func (router *Router) gossipChannel(channelName string) *gossipChannel {
router.gossipLock.RLock()
channel, found := router.gossipChannels[channelName]
Expand All @@ -156,7 +169,16 @@ func (router *Router) gossipChannel(channelName string) *gossipChannel {
if channel, found = router.gossipChannels[channelName]; found {
return channel
}
channel = newGossipChannel(channelName, router.Ourself, router.Routes, &surrogateGossiper{router: router}, router.logger)
// unknown channel - do we have a GossiperMaker?
var gossiper Gossiper
if router.GossiperMaker != nil {
// use the GossiperMaker to make the surrogate channel
gossiper = router.GossiperMaker.MakeGossiper(channelName, router)
} else {
// default surrogate channel
gossiper = &surrogateGossiper{router: router}
}
channel = newGossipChannel(channelName, router.Ourself, router.Routes, gossiper, router.logger)
channel.logf("created surrogate channel")
router.gossipChannels[channelName] = channel
return channel
Expand Down