Skip to content
This repository has been archived by the owner on Jun 20, 2024. It is now read-only.

Commit

Permalink
Merge pull request #1698 from weaveworks/1697-copy-on-write-gossip-data
Browse files Browse the repository at this point in the history
make GossipData immutable; Fixes #1697. Fixes #1450. Fixes #1452.
  • Loading branch information
bboreham committed Nov 19, 2015
2 parents de802f9 + 1500b1d commit f25af8f
Show file tree
Hide file tree
Showing 6 changed files with 34 additions and 23 deletions.
4 changes: 2 additions & 2 deletions ipam/allocator.go
Original file line number Diff line number Diff line change
Expand Up @@ -478,8 +478,8 @@ type ipamGossipData struct {
alloc *Allocator
}

func (d *ipamGossipData) Merge(other mesh.GossipData) {
// no-op
func (d *ipamGossipData) Merge(other mesh.GossipData) mesh.GossipData {
return d // no-op
}

func (d *ipamGossipData) Encode() [][]byte {
Expand Down
5 changes: 2 additions & 3 deletions mesh/gossip.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import (

type GossipData interface {
Encode() [][]byte
Merge(GossipData)
Merge(GossipData) GossipData
}

type Gossip interface {
Expand Down Expand Up @@ -79,8 +79,7 @@ func (sender *GossipSender) Send(data GossipData) {
// NB: this must not be invoked concurrently
select {
case pending := <-sender.cell:
pending.Merge(data)
sender.cell <- pending
sender.cell <- pending.Merge(data)
default:
sender.cell <- data
}
Expand Down
16 changes: 9 additions & 7 deletions mesh/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,17 +127,19 @@ func (router *Router) BroadcastTopologyUpdate(update []*Peer) {
for _, p := range update {
names[p.Name] = void
}

router.TopologyGossip.GossipBroadcast(&TopologyGossipData{
peers: router.Peers,
update: names,
})
router.TopologyGossip.GossipBroadcast(
&TopologyGossipData{peers: router.Peers, update: names})
}

func (d *TopologyGossipData) Merge(other GossipData) {
func (d *TopologyGossipData) Merge(other GossipData) GossipData {
names := make(PeerNameSet)
for name := range d.update {
names[name] = void
}
for name := range other.(*TopologyGossipData).update {
d.update[name] = void
names[name] = void
}
return &TopologyGossipData{peers: d.peers, update: names}
}

func (d *TopologyGossipData) Encode() [][]byte {
Expand Down
8 changes: 6 additions & 2 deletions mesh/surrogate_gossiper.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,12 @@ func (d *SurrogateGossipData) Encode() [][]byte {
return d.messages
}

func (d *SurrogateGossipData) Merge(other GossipData) {
d.messages = append(d.messages, other.(*SurrogateGossipData).messages...)
func (d *SurrogateGossipData) Merge(other GossipData) GossipData {
o := other.(*SurrogateGossipData)
messages := make([][]byte, 0, len(d.messages)+len(o.messages))
messages = append(messages, d.messages...)
messages = append(messages, o.messages...)
return &SurrogateGossipData{messages: messages}
}

// SurrogateGossiper ignores unicasts and relays broadcasts and gossips.
Expand Down
20 changes: 13 additions & 7 deletions nameserver/entry.go
Original file line number Diff line number Diff line change
Expand Up @@ -245,12 +245,14 @@ type GossipData struct {
Entries
}

func (g *GossipData) Merge(o mesh.GossipData) {
func (g *GossipData) Merge(o mesh.GossipData) mesh.GossipData {
other := o.(*GossipData)
g.Entries.merge(other.Entries)
if g.Timestamp < other.Timestamp {
g.Timestamp = other.Timestamp
gossip := g.copy()
gossip.Entries.merge(other.Entries)
if gossip.Timestamp < other.Timestamp {
gossip.Timestamp = other.Timestamp
}
return gossip
}

func (g *GossipData) Decode(msg []byte) error {
Expand All @@ -264,13 +266,17 @@ func (g *GossipData) Decode(msg []byte) error {
}

func (g *GossipData) Encode() [][]byte {
// Make a copy so we can sort: all outgoing data is sent in case-sensitive order
g2 := GossipData{Timestamp: g.Timestamp, Entries: make(Entries, len(g.Entries))}
copy(g2.Entries, g.Entries)
g2 := g.copy()
sort.Sort(CaseSensitive(g2.Entries))
buf := &bytes.Buffer{}
if err := gob.NewEncoder(buf).Encode(g2); err != nil {
panic(err)
}
return [][]byte{buf.Bytes()}
}

func (g *GossipData) copy() *GossipData {
g2 := &GossipData{Timestamp: g.Timestamp, Entries: make(Entries, len(g.Entries))}
copy(g2.Entries, g.Entries)
return g2
}
4 changes: 2 additions & 2 deletions nameserver/entry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ func TestGossipDataMerge(t *testing.T) {
g1 := GossipData{Entries: makeEntries("AcDf")}
g2 := GossipData{Entries: makeEntries("BEf")}

g1.Merge(&g2)
g3 := g1.Merge(&g2).(*GossipData)

require.Equal(t, GossipData{Entries: makeEntries("ABcDEf")}, g1)
require.Equal(t, GossipData{Entries: makeEntries("ABcDEf")}, *g3)
}

0 comments on commit f25af8f

Please sign in to comment.