Skip to content

Commit

Permalink
[Feature] Identity carry over part 2 / Membership package (#189)
Browse files Browse the repository at this point in the history
This PR is part 2 for identity carry over.

In this PR we introduce a stricter interface for what a `Membership` is in a `membership` package. The membership defines an 'observable membership' for the application. Only containing members that are an active part of the membership eg. No `faulty`, `leave` or `tombstone` members will be observable.

It introduces what a `Member` is via an interface and makes sure that the `swim.Member` implements it.

Lastly it adds an event that is triggered when changes happen to the membership. This event is different from the swim event that contains changes in that it contains the `Before` and `After` state of a `Member` that can be used by the application to respond to.
  • Loading branch information
thanodnl authored and Menno Pruijssers committed Dec 19, 2016
1 parent b782030 commit 561b183
Show file tree
Hide file tree
Showing 5 changed files with 502 additions and 52 deletions.
21 changes: 21 additions & 0 deletions membership/events.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package membership

// MemberChange shows the state before and after the change of a Member
type MemberChange struct {
// Before is the state of the member before the change, if the
// member is a new member the before state is nil
Before Member
// After is the state of the member after the change, if the
// member left the after state will be nil
After Member
}

// ChangeEvent indicates that the membership has changed. The event will contain
// a list of changes that will show both the old and the new state of a member.
// It is not guaranteed that any of the observable state of a member has in fact
// changed, it might only be an interal state change for the underlying
// membership.
type ChangeEvent struct {
// Changes is a slice of changes that is related to this event
Changes []MemberChange
}
19 changes: 19 additions & 0 deletions membership/interface.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package membership

// Member defines a member of the membership. It can be used by applications to
// apply specific business logic on Members. Examples are:
// - Get the address of a member for RPC calls, both forwarding of internal
// calls that should target a Member
// - Decissions to include a Member in a query via predicates.
type Member interface {
// GetAddress returns the external address used by the rpc layer to
// communicate to the member.
//
// Note: It is prefixed with Get for legacy reasons and can be removed after
// a refactor of the swim.Member to free up the `Address` name.
GetAddress() string

// Label reads the label for a given key from the member. It also returns
// wether or not the label was present on the member
Label(key string) (value string, has bool)
}
24 changes: 24 additions & 0 deletions swim/member.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,18 @@ type Member struct {
// string to string mappings containing user data that is gossiped around in SWIM.
type LabelMap map[string]string

// GetAddress returns the Address of a member.
func (m Member) GetAddress() string {
return m.Address
}

// Label returns the value of a label named by key. The `has` boolean indicates
// if the label was set on the member or not
func (m Member) Label(key string) (value string, has bool) {
value, has = m.Labels[key]
return
}

// suspect interface
func (m Member) address() string {
return m.Address
Expand All @@ -93,6 +105,18 @@ func (m Member) checksumString(b *bytes.Buffer) {
m.Labels.checksumString(b)
}

// copy creates a non-nil version of the LabelMap that copies all existing
// entries of the map. This can be used to create a new version of the labels
// that can be mutated before putting it on a Member to make updates without
// mutating the map that was already on a Member
func (l LabelMap) copy() (result LabelMap) {
result = make(map[string]string, len(l))
for key, value := range l {
result[key] = value
}
return
}

// checksumString adds the label portion of the checksum to the buffer that is
// passed in. The string will not be appended in the case where labels are not
// set on this member. This is for backwards compatibility reasons with older
Expand Down
162 changes: 111 additions & 51 deletions swim/memberlist.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ import (
"sync"
"time"

"github.com/uber/ringpop-go/membership"

"github.com/benbjohnson/clock"
"github.com/dgryski/go-farm"
"github.com/uber-common/bark"
Expand Down Expand Up @@ -291,10 +293,10 @@ func (m *memberlist) MakeFaulty(address string, incarnation int64) []Change {
}

func (m *memberlist) SetLocalStatus(status string) {
m.members.Lock()
m.local.Status = status
m.members.Unlock()
m.postLocalUpdate()
m.updateLocalMember(func(member *Member) bool {
member.Status = status
return true
})
}

// SetLocalLabel sets the label identified by key to the new value. This
Expand Down Expand Up @@ -345,32 +347,31 @@ func (m *memberlist) SetLocalLabels(labels map[string]string) error {
return err
}

m.members.Lock()
// ensure that there is a labels map
if m.local.Labels == nil {
m.local.Labels = make(map[string]string, len(labels))
}
m.updateLocalMember(func(member *Member) bool {
// ensure that there is a new copy of the labels to work with.
labelsCopy := member.Labels.copy()

// keep track if we made changes to the labels
changes := false
// keep track if we made changes to the labels
changes := false

// copy the key-value pairs to our internal labels. By not setting the map
// of labels to the Labels value of the local member we prevent removing labels
// that the user did not specify in the new map.
for key, value := range labels {
old, had := m.local.Labels[key]
m.local.Labels[key] = value
// copy the key-value pairs to our internal labels. By not setting the map
// of labels to the Labels value of the local member we prevent removing labels
// that the user did not specify in the new map.
for key, value := range labels {
old, had := labelsCopy[key]
labelsCopy[key] = value

if !had || old != value {
changes = true
if !had || old != value {
changes = true
}
}
}

m.members.Unlock()

if changes {
m.postLocalUpdate()
}
if changes {
// only if there are changes we put the copied labels on the member.
member.Labels = labelsCopy
}
return changes
})

return nil
}
Expand All @@ -381,47 +382,81 @@ func (m *memberlist) SetLocalLabels(labels map[string]string) error {
// and subsequently be gossiped around. It is a valid operation to remove non-
// existing keys. It returns true if all (and only all) labels have been removed.
func (m *memberlist) RemoveLocalLabels(keys ...string) bool {
m.members.Lock()
if len(m.local.Labels) == 0 || len(keys) == 0 {
m.members.Unlock()
// nothing to delete
return false
}
// keep track if all labels are removed, it will be set to false if a label
// couldn't be removed.
removed := true

m.updateLocalMember(func(member *Member) bool {
// ensure that there is a new copy of the labels to work
// with.
labelsCopy := member.Labels.copy()

any := false // keep track if we at least removed one label
for _, key := range keys {
_, has := labelsCopy[key]
delete(labelsCopy, key)
removed = removed && has
any = any || has
}

any := false // keep track if we at least removed one label
removed := true // keep track if all labels are removed
for _, key := range keys {
_, has := m.local.Labels[key]
delete(m.local.Labels, key)
removed = removed && has
any = any || has
}
if any {
// only if there are changes we put the copied labels on the member.
member.Labels = labelsCopy
}

m.members.Unlock()
return any
})

if any {
// only reincarnate if there is a label removed
m.postLocalUpdate()
}
return removed
}

// postLocalUpdate should be called after the local Member has been updated to
// make sure that its new state has a higher incarnation number and the change
// will be recorded as a change to gossip around.
func (m *memberlist) postLocalUpdate() {
// bump our incarnation for this change to be accepted by all peers
// updateLocalMember takes an update function to upate the member passed in. The
// update function can make mutations to the member and should indicate if it
// made changes, only if changes are made the incarnation number will be bumped
// and the new state will be gossiped to the peers
func (m *memberlist) updateLocalMember(update func(*Member) bool) {
m.members.Lock()

before := *m.local
didUpdate := update(m.local)

// exit if the update didn't change anything
if !didUpdate {
m.members.Unlock()
return
}

// bump incarnation number if the member has been updated
change := m.bumpIncarnation()

changes := []Change{change}

after := *m.local

m.members.Unlock()

// since we changed our local state we need to update our checksum
m.ComputeChecksum()

changes := []Change{change}

// kick in our updating mechanism
m.node.handleChanges(changes)

// prepare a membership change event for observable state changes
var memberChange membership.MemberChange
if before.isReachable() {
memberChange.Before = before
}
if after.isReachable() {
memberChange.After = after
}

if memberChange.Before != nil || memberChange.After != nil {
m.node.EmitEvent(membership.ChangeEvent{
Changes: []membership.MemberChange{
memberChange,
},
})
}
}

// MakeTombstone declares the node with the provided address in the tombstone state
Expand Down Expand Up @@ -492,6 +527,8 @@ func (m *memberlist) Update(changes []Change) (applied []Change) {

m.node.EmitEvent(MemberlistChangesReceivedEvent{changes})

var memberChanges []membership.MemberChange

m.Lock()

// run through all changes received and figure out if they need to be accepted
Expand All @@ -517,6 +554,20 @@ func (m *memberlist) Update(changes []Change) (applied []Change) {
} else {
// otherwise it can be applied to the memberlist

// prepare the change and collect if there is an outside
// observable change eg. changes that involve active
// participants of the membership (pingable)
memberChange := membership.MemberChange{}
if has && member.isReachable() {
memberChange.Before = *member
}
if gossip.isReachable() {
memberChange.After = gossip
}
if memberChange.Before != nil || memberChange.After != nil {
memberChanges = append(memberChanges, memberChange)
}

if !has {
// if the member was not already present in the list we will
// add it and assign it a random position in the list to ensure
Expand Down Expand Up @@ -561,6 +612,15 @@ func (m *memberlist) Update(changes []Change) (applied []Change) {
NumMembers: m.NumMembers(),
})
m.node.handleChanges(applied)

}

// if there are changes that are important for outside observers of the
// membership emit those
if len(memberChanges) > 0 {
m.node.EmitEvent(membership.ChangeEvent{
Changes: memberChanges,
})
}

m.Unlock()
Expand Down
Loading

0 comments on commit 561b183

Please sign in to comment.