Skip to content

Commit

Permalink
Merge a0468ac into a4d8488
Browse files Browse the repository at this point in the history
  • Loading branch information
thanodnl committed Dec 16, 2016
2 parents a4d8488 + a0468ac commit 23c8f1e
Show file tree
Hide file tree
Showing 5 changed files with 502 additions and 52 deletions.
21 changes: 21 additions & 0 deletions membership/events.go
@@ -0,0 +1,21 @@
package membership

// MemberChange shows the state before and after the change of a Member
type MemberChange struct {
// Before is the state of the member before the change, if the
// member is a new member the before state is nil
Before Member
// After is the state of the member after the change, if the
// member left the after state will be nil
After Member
}

// ChangeEvent indicates that the membership has changed. The event will contain
// a list of changes that will show both the old and the new state of a member.
// It is not guaranteed that any of the observable state of a member has in fact
// changed, it might only be an interal state change for the underlying
// membership.
type ChangeEvent struct {
// Changes is a slice of changes that is related to this event
Changes []MemberChange
}
19 changes: 19 additions & 0 deletions membership/interface.go
@@ -0,0 +1,19 @@
package membership

// Member defines a member of the membership. It can be used by applications to
// apply specific business logic on Members. Examples are:
// - Get the address of a member for RPC calls, both forwarding of internal
// calls that should target a Member
// - Decissions to include a Member in a query via predicates.
type Member interface {
// GetAddress returns the external address used by the rpc layer to
// communicate to the member.
//
// Note: It is prefixed with Get for legacy reasons and can be removed after
// a refactor of the swim.Member to free up the `Address` name.
GetAddress() string

// Label reads the label for a given key from the member. It also returns
// wether or not the label was present on the member
Label(key string) (value string, has bool)
}
24 changes: 24 additions & 0 deletions swim/member.go
Expand Up @@ -68,6 +68,18 @@ type Member struct {
// string to string mappings containing user data that is gossiped around in SWIM.
type LabelMap map[string]string

// GetAddress returns the Address of a member.
func (m Member) GetAddress() string {
return m.Address
}

// Label returns the value of a label named by key. The `has` boolean indicates
// if the label was set on the member or not
func (m Member) Label(key string) (value string, has bool) {
value, has = m.Labels[key]
return
}

// suspect interface
func (m Member) address() string {
return m.Address
Expand All @@ -93,6 +105,18 @@ func (m Member) checksumString(b *bytes.Buffer) {
m.Labels.checksumString(b)
}

// copy creates a non-nil version of the LabelMap that copies all existing
// entries of the map. This can be used to create a new version of the labels
// that can be mutated before putting it on a Member to make updates without
// mutating the map that was already on a Member
func (l LabelMap) copy() (result LabelMap) {
result = make(map[string]string, len(l))
for key, value := range l {
result[key] = value
}
return
}

// checksumString adds the label portion of the checksum to the buffer that is
// passed in. The string will not be appended in the case where labels are not
// set on this member. This is for backwards compatibility reasons with older
Expand Down
162 changes: 111 additions & 51 deletions swim/memberlist.go
Expand Up @@ -28,6 +28,8 @@ import (
"sync"
"time"

"github.com/uber/ringpop-go/membership"

"github.com/benbjohnson/clock"
"github.com/dgryski/go-farm"
"github.com/uber-common/bark"
Expand Down Expand Up @@ -291,10 +293,10 @@ func (m *memberlist) MakeFaulty(address string, incarnation int64) []Change {
}

func (m *memberlist) SetLocalStatus(status string) {
m.members.Lock()
m.local.Status = status
m.members.Unlock()
m.postLocalUpdate()
m.updateLocalMember(func(member *Member) bool {
member.Status = status
return true
})
}

// SetLocalLabel sets the label identified by key to the new value. This
Expand Down Expand Up @@ -345,32 +347,31 @@ func (m *memberlist) SetLocalLabels(labels map[string]string) error {
return err
}

m.members.Lock()
// ensure that there is a labels map
if m.local.Labels == nil {
m.local.Labels = make(map[string]string, len(labels))
}
m.updateLocalMember(func(member *Member) bool {
// ensure that there is a new copy of the labels to work with.
labelsCopy := member.Labels.copy()

// keep track if we made changes to the labels
changes := false
// keep track if we made changes to the labels
changes := false

// copy the key-value pairs to our internal labels. By not setting the map
// of labels to the Labels value of the local member we prevent removing labels
// that the user did not specify in the new map.
for key, value := range labels {
old, had := m.local.Labels[key]
m.local.Labels[key] = value
// copy the key-value pairs to our internal labels. By not setting the map
// of labels to the Labels value of the local member we prevent removing labels
// that the user did not specify in the new map.
for key, value := range labels {
old, had := labelsCopy[key]
labelsCopy[key] = value

if !had || old != value {
changes = true
if !had || old != value {
changes = true
}
}
}

m.members.Unlock()

if changes {
m.postLocalUpdate()
}
if changes {
// only if there are changes we put the copied labels on the member.
member.Labels = labelsCopy
}
return changes
})

return nil
}
Expand All @@ -381,47 +382,81 @@ func (m *memberlist) SetLocalLabels(labels map[string]string) error {
// and subsequently be gossiped around. It is a valid operation to remove non-
// existing keys. It returns true if all (and only all) labels have been removed.
func (m *memberlist) RemoveLocalLabels(keys ...string) bool {
m.members.Lock()
if len(m.local.Labels) == 0 || len(keys) == 0 {
m.members.Unlock()
// nothing to delete
return false
}
// keep track if all labels are removed, it will be set to false if a label
// couldn't be removed.
removed := true

m.updateLocalMember(func(member *Member) bool {
// ensure that there is a new copy of the labels to work
// with.
labelsCopy := member.Labels.copy()

any := false // keep track if we at least removed one label
for _, key := range keys {
_, has := labelsCopy[key]
delete(labelsCopy, key)
removed = removed && has
any = any || has
}

any := false // keep track if we at least removed one label
removed := true // keep track if all labels are removed
for _, key := range keys {
_, has := m.local.Labels[key]
delete(m.local.Labels, key)
removed = removed && has
any = any || has
}
if any {
// only if there are changes we put the copied labels on the member.
member.Labels = labelsCopy
}

m.members.Unlock()
return any
})

if any {
// only reincarnate if there is a label removed
m.postLocalUpdate()
}
return removed
}

// postLocalUpdate should be called after the local Member has been updated to
// make sure that its new state has a higher incarnation number and the change
// will be recorded as a change to gossip around.
func (m *memberlist) postLocalUpdate() {
// bump our incarnation for this change to be accepted by all peers
// updateLocalMember takes an update function to upate the member passed in. The
// update function can make mutations to the member and should indicate if it
// made changes, only if changes are made the incarnation number will be bumped
// and the new state will be gossiped to the peers
func (m *memberlist) updateLocalMember(update func(*Member) bool) {
m.members.Lock()

before := *m.local
didUpdate := update(m.local)

// exit if the update didn't change anything
if !didUpdate {
m.members.Unlock()
return
}

// bump incarnation number if the member has been updated
change := m.bumpIncarnation()

changes := []Change{change}

after := *m.local

m.members.Unlock()

// since we changed our local state we need to update our checksum
m.ComputeChecksum()

changes := []Change{change}

// kick in our updating mechanism
m.node.handleChanges(changes)

// prepare a membership change event for observable state changes
var memberChange membership.MemberChange
if before.isReachable() {
memberChange.Before = before
}
if after.isReachable() {
memberChange.After = after
}

if memberChange.Before != nil || memberChange.After != nil {
m.node.EmitEvent(membership.ChangeEvent{
Changes: []membership.MemberChange{
memberChange,
},
})
}
}

// MakeTombstone declares the node with the provided address in the tombstone state
Expand Down Expand Up @@ -492,6 +527,8 @@ func (m *memberlist) Update(changes []Change) (applied []Change) {

m.node.EmitEvent(MemberlistChangesReceivedEvent{changes})

var memberChanges []membership.MemberChange

m.Lock()

// run through all changes received and figure out if they need to be accepted
Expand All @@ -517,6 +554,20 @@ func (m *memberlist) Update(changes []Change) (applied []Change) {
} else {
// otherwise it can be applied to the memberlist

// prepare the change and collect if there is an outside
// observable change eg. changes that involve active
// participants of the membership (pingable)
memberChange := membership.MemberChange{}
if has && member.isReachable() {
memberChange.Before = *member
}
if gossip.isReachable() {
memberChange.After = gossip
}
if memberChange.Before != nil || memberChange.After != nil {
memberChanges = append(memberChanges, memberChange)
}

if !has {
// if the member was not already present in the list we will
// add it and assign it a random position in the list to ensure
Expand Down Expand Up @@ -561,6 +612,15 @@ func (m *memberlist) Update(changes []Change) (applied []Change) {
NumMembers: m.NumMembers(),
})
m.node.handleChanges(applied)

}

// if there are changes that are important for outside observers of the
// membership emit those
if len(memberChanges) > 0 {
m.node.EmitEvent(membership.ChangeEvent{
Changes: memberChanges,
})
}

m.Unlock()
Expand Down

0 comments on commit 23c8f1e

Please sign in to comment.